Skip to content

Commit

Permalink
Merge pull request #840 from aaxelb/feat/multiple-current-indexes
Browse files Browse the repository at this point in the history
[ENG-6708] IndexStrategy revamp (more than one index per strategy, wow)
  • Loading branch information
aaxelb authored Jan 28, 2025
2 parents c953024 + 2d267eb commit f9c499e
Show file tree
Hide file tree
Showing 46 changed files with 1,907 additions and 1,436 deletions.
5 changes: 5 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ Multiple records which describe the same item/object are grouped by a
the source repository. In most outward-facing views, default to showing only
the most recent record for each suid.

### Conventions
(an incomplete list)

- functions prefixed `pls_` ("please") are a request for something to happen

## Why this?
inspired by [this writeup](https://matklad.github.io/2021/02/06/ARCHITECTURE.md.html)
and [this example architecture document](https://github.com/rust-analyzer/rust-analyzer/blob/d7c99931d05e3723d878bea5dc26766791fa4e69/docs/dev/architecture.md)
6 changes: 3 additions & 3 deletions api/search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def post(self, request):

def _handle_request(self, request):
queryparams = request.query_params.dict()
requested_index_strategy = queryparams.pop('indexStrategy', None)
requested_index_strategy = queryparams.get('indexStrategy', None)
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
specific_index = index_strategy.get_index_for_sharev2_search(requested_index_strategy)
_index_strategy = index_strategy.get_strategy_for_sharev2_search(requested_index_strategy)
except exceptions.IndexStrategyError as error:
raise http.Http404(str(error))
try:
response_json = specific_index.pls_handle_search__sharev2_backcompat(
response_json = _index_strategy.pls_handle_search__passthru(
request_body=request.data,
request_queryparams=queryparams,
)
Expand Down
6 changes: 3 additions & 3 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MetadataRecordsRSS(Feed):
description = 'Updates to the SHARE open dataset'
author_name = 'SHARE'

_search_index: index_strategy.IndexStrategy.SpecificIndex
_search_strategy: index_strategy.IndexStrategy

def title(self, obj):
query = json.dumps(obj.get('query', 'All'))
Expand All @@ -43,7 +43,7 @@ def title(self, obj):
def get_object(self, request):
self._order = request.GET.get('order')
elastic_query = request.GET.get('elasticQuery')
self._search_index = index_strategy.get_index_for_sharev2_search(request.GET.get('indexStrategy'))
self._search_strategy = index_strategy.get_strategy_for_sharev2_search(request.GET.get('indexStrategy'))

if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
self._order = 'date_modified'
Expand All @@ -64,7 +64,7 @@ def get_object(self, request):

def items(self, obj):
try:
json_response = self._search_index.pls_handle_search__sharev2_backcompat(
json_response = self._search_strategy.pls_handle_search__passthru(
request_body=obj,
)
except IndexStrategyError:
Expand Down
6 changes: 3 additions & 3 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ class FormattedMetadataRecordAdmin(admin.ModelAdmin):
class IndexBackfillAdmin(admin.ModelAdmin):
readonly_fields = (
'index_strategy_name',
'specific_indexname',
'strategy_checksum',
'error_type',
'error_message',
'error_context',
)
paginator = TimeLimitedPaginator
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'specific_indexname')
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'strategy_checksum')
show_full_result_count = False
search_fields = ('index_strategy_name', 'specific_indexname',)
search_fields = ('index_strategy_name', 'strategy_checksum',)
actions = ('reset_to_initial',)

def reset_to_initial(self, request, queryset):
Expand Down
91 changes: 43 additions & 48 deletions share/admin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from share.admin.util import admin_url
from share.models.index_backfill import IndexBackfill
from share.search.index_messenger import IndexMessenger
from share.search import index_strategy
from share.search.index_strategy import (
IndexStrategy,
all_strategy_names,
each_strategy,
parse_strategy_name,
parse_specific_index_name,
)


logger = logging.getLogger(__name__)
Expand All @@ -25,19 +31,15 @@ def search_indexes_view(request):
},
)
if request.method == 'POST':
_specific_index = index_strategy.get_specific_index(request.POST['specific_indexname'])
_index_strategy = parse_strategy_name(request.POST['strategy_name'])
_pls_doer = PLS_DOERS[request.POST['pls_do']]
_pls_doer(_specific_index)
_redirect_id = (
_specific_index.index_strategy.name
if _pls_doer is _pls_delete
else _specific_index.indexname
)
_pls_doer(_index_strategy)
_redirect_id = _index_strategy.strategy_name
return HttpResponseRedirect('#'.join((request.path, _redirect_id)))


def search_index_mappings_view(request, index_name):
_specific_index = index_strategy.get_specific_index(index_name)
_specific_index = parse_specific_index_name(index_name)
_mappings = _specific_index.pls_get_mappings()
return JsonResponse(_mappings)

Expand All @@ -52,30 +54,23 @@ def _mappings_url_prefix():


def _index_status_by_strategy():
backfill_by_indexname: dict[str, IndexBackfill] = {
backfill.specific_indexname: backfill
for backfill in (
_backfill_by_checksum: dict[str, IndexBackfill] = {
_backfill.strategy_checksum: _backfill
for _backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
.filter(index_strategy_name__in=all_strategy_names())
)
}
status_by_strategy = {}
_messenger = IndexMessenger()
for _index_strategy in index_strategy.all_index_strategies().values():
current_index = _index_strategy.for_current_index()
status_by_strategy[_index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'backfill': _serialize_backfill(
current_index,
backfill_by_indexname.get(current_index.indexname),
),
},
'prior': sorted((
specific_index.pls_get_status()
for specific_index in _index_strategy.each_specific_index()
if not specific_index.is_current
), reverse=True),
for _index_strategy in each_strategy():
_current_backfill = (
_backfill_by_checksum.get(str(_index_strategy.CURRENT_STRATEGY_CHECKSUM))
or _backfill_by_checksum.get(_index_strategy.indexname_prefix) # backcompat
)
status_by_strategy[_index_strategy.strategy_name] = {
'status': _index_strategy.pls_get_strategy_status(),
'backfill': _serialize_backfill(_index_strategy, _current_backfill),
'queues': [
{
'name': _queue_name,
Expand All @@ -91,14 +86,14 @@ def _index_status_by_strategy():


def _serialize_backfill(
specific_index: index_strategy.IndexStrategy.SpecificIndex,
strategy: IndexStrategy,
backfill: IndexBackfill | None,
):
if not specific_index.is_current:
if not strategy.is_current:
return {}
if not backfill:
return {
'can_start_backfill': specific_index.pls_check_exists(),
'can_start_backfill': strategy.pls_check_exists(),
}
return {
'backfill_status': backfill.backfill_status,
Expand All @@ -109,35 +104,35 @@ def _serialize_backfill(
}


def _pls_setup(specific_index):
assert specific_index.is_current
specific_index.pls_setup()
def _pls_setup(index_strategy: IndexStrategy):
assert index_strategy.is_current
index_strategy.pls_setup()


def _pls_start_keeping_live(specific_index):
specific_index.pls_start_keeping_live()
def _pls_start_keeping_live(index_strategy: IndexStrategy):
index_strategy.pls_start_keeping_live()


def _pls_stop_keeping_live(specific_index):
specific_index.pls_stop_keeping_live()
def _pls_stop_keeping_live(index_strategy: IndexStrategy):
index_strategy.pls_stop_keeping_live()


def _pls_start_backfill(specific_index):
assert specific_index.is_current
specific_index.index_strategy.pls_start_backfill()
def _pls_start_backfill(index_strategy: IndexStrategy):
assert index_strategy.is_current
index_strategy.pls_start_backfill()


def _pls_mark_backfill_complete(specific_index):
specific_index.index_strategy.pls_mark_backfill_complete()
def _pls_mark_backfill_complete(index_strategy: IndexStrategy):
index_strategy.pls_mark_backfill_complete()


def _pls_make_default_for_searching(specific_index):
specific_index.index_strategy.pls_make_default_for_searching(specific_index)
def _pls_make_default_for_searching(index_strategy: IndexStrategy):
index_strategy.pls_make_default_for_searching()


def _pls_delete(specific_index):
assert not specific_index.is_current
specific_index.pls_delete()
def _pls_delete(index_strategy: IndexStrategy):
assert not index_strategy.is_current
index_strategy.pls_teardown()


PLS_DOERS = {
Expand Down
29 changes: 10 additions & 19 deletions share/bin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def search(args, argv):
@search.subcommand('Drop the Elasticsearch index')
def purge(args, argv):
"""
Usage: {0} search purge <index_names>...
Usage: {0} search purge <strategy_names>...
"""
for index_name in args['<index_names>']:
specific_index = index_strategy.get_specific_index(index_name)
specific_index.pls_delete()
for _strategy_name in args['<strategy_names>']:
_strategy = index_strategy.parse_strategy_name(_strategy_name)
_strategy.pls_teardown()


@search.subcommand('Create indicies and apply mappings')
Expand All @@ -41,25 +41,16 @@ def setup(args, argv):
"""
_is_initial = args.get('--initial')
if _is_initial:
_specific_indexes = [
_index_strategy.for_current_index()
for _index_strategy in index_strategy.all_index_strategies().values()
]
for _index_strategy in index_strategy.each_strategy():
_index_strategy.pls_setup()
else:
_index_or_strategy_name = args['<index_or_strategy_name>']
try:
_specific_indexes = [index_strategy.get_specific_index(_index_or_strategy_name)]
_strategy = index_strategy.get_strategy(_index_or_strategy_name)
except IndexStrategyError:
try:
_specific_indexes = [
index_strategy.get_specific_index(_index_or_strategy_name),
]
except IndexStrategyError:
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
for _specific_index in _specific_indexes:
_specific_index.pls_setup(
skip_backfill=_is_initial, # for initial setup, there's nothing back to fill
)
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
else:
_strategy.pls_setup()


@search.subcommand('Start the search indexing daemon')
Expand Down
2 changes: 1 addition & 1 deletion share/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def check_all_index_strategies_current(app_configs, **kwargs):
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
errors = []
for _index_strategy in index_strategy.all_index_strategies().values():
for _index_strategy in index_strategy.each_strategy():
try:
_index_strategy.assert_strategy_is_current()
except IndexStrategyError as exception:
Expand Down
18 changes: 14 additions & 4 deletions share/models/index_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def __repr__(self):
def __str__(self):
return repr(self)

@property
def strategy_checksum(self):
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
return self.specific_indexname # for backcompat

@strategy_checksum.setter
def strategy_checksum(self, value):
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
self.specific_indexname = value

@contextlib.contextmanager
def mutex(self):
with IndexBackfill.objects.get_with_mutex(pk=self.pk) as index_backfill:
Expand All @@ -76,14 +86,14 @@ def mutex(self):

def pls_start(self, index_strategy):
with self.mutex() as locked_self:
assert locked_self.index_strategy_name == index_strategy.name
current_index = index_strategy.for_current_index()
if locked_self.specific_indexname == current_index.indexname:
assert locked_self.index_strategy_name == index_strategy.strategy_name
_current_checksum = str(index_strategy.CURRENT_STRATEGY_CHECKSUM)
if locked_self.strategy_checksum == _current_checksum:
# what is "current" has not changed -- should be INITIAL
assert locked_self.backfill_status == IndexBackfill.INITIAL
else:
# what is "current" has changed! disregard backfill_status
locked_self.specific_indexname = current_index.indexname
locked_self.strategy_checksum = _current_checksum
locked_self.backfill_status = IndexBackfill.INITIAL
locked_self.__update_error(None)
try:
Expand Down
13 changes: 7 additions & 6 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start_daemonthreads_for_strategy(self, index_strategy):
return _daemon

def start_all_daemonthreads(self):
for _index_strategy in index_strategy.all_index_strategies().values():
for _index_strategy in index_strategy.each_strategy():
self.start_daemonthreads_for_strategy(_index_strategy)

def stop_daemonthreads(self, *, wait=False):
Expand Down Expand Up @@ -119,7 +119,7 @@ def get_consumers(self, Consumer, channel):
]

def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.name)
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.strategy_name)

def consume(self, *args, **kwargs):
# wrap `consume` in `kombu.Connection.ensure`, following guidance from
Expand Down Expand Up @@ -191,7 +191,7 @@ def on_message(self, body, message):
continue

def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.name)
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.strategy_name)


@dataclasses.dataclass
Expand Down Expand Up @@ -232,11 +232,12 @@ def _the_loop_itself(self):
def _raise_if_backfill_noncurrent(self):
if self.message_type.is_backfill:
index_backfill = self.index_strategy.get_or_create_backfill()
if index_backfill.specific_indexname != self.index_strategy.current_indexname:
_current_checksum = str(self.index_strategy.CURRENT_STRATEGY_CHECKSUM)
if index_backfill.strategy_checksum != _current_checksum:
raise exceptions.DaemonSetupError(
'IndexerDaemon observes conflicting currence:'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.specific_indexname}"'
f'\n\tIndexStrategy (from static code) says current is "{self.index_strategy.current_indexname}"'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.strategy_checksum}"'
f'\n\tIndexStrategy (from static code) says current is "{_current_checksum}"'
'\n\t(may be the daemon is running old code -- will die and retry,'
' but if this keeps happening you may need to reset backfill_status'
' to INITIAL and restart the backfill)'
Expand Down
2 changes: 1 addition & 1 deletion share/search/index_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, *, celery_app=None, index_strategys=None):
if celery_app is None
else celery_app
)
self.index_strategys = index_strategys or tuple(index_strategy.all_index_strategies().values())
self.index_strategys = index_strategys or tuple(index_strategy.each_strategy())

def notify_indexcard_update(self, indexcards, *, urgent=False):
self.send_messages_chunk(
Expand Down
Loading

0 comments on commit f9c499e

Please sign in to comment.