Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENG-6708] IndexStrategy revamp (more than one index per strategy, wow) #840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a16afeb
wip
aaxelb Jan 6, 2025
0a7dc4b
s/each_specific_index/each_existing_index
aaxelb Jan 7, 2025
e373888
s/for_specific_index/get_index_by_name
aaxelb Jan 7, 2025
99eae9f
plan (add _TODO_multindex.txt)
aaxelb Jan 7, 2025
86ef59f
wip...
aaxelb Jan 7, 2025
5e65e56
wip.....
aaxelb Jan 7, 2025
ac8f49b
wip....
aaxelb Jan 9, 2025
0cce300
wip.......
aaxelb Jan 9, 2025
8f4fa1c
wip (remove unused before_chunk; make after_chunk multiindex-friendly)
aaxelb Jan 10, 2025
b125f19
wip (simplify share.search.index_strategy)
aaxelb Jan 10, 2025
042a944
wip...
aaxelb Jan 15, 2025
ff96a19
wip..
aaxelb Jan 15, 2025
1860f35
wip....
aaxelb Jan 16, 2025
0a7a5c4
wip.....
aaxelb Jan 16, 2025
e59629f
wip......
aaxelb Jan 16, 2025
8c3350b
wip.......
aaxelb Jan 16, 2025
0d84aef
wip.....
aaxelb Jan 17, 2025
ae980b7
wip...
aaxelb Jan 17, 2025
4b1fea9
wip..
aaxelb Jan 17, 2025
7eaac3f
wip...
aaxelb Jan 17, 2025
d194be8
wip.....
aaxelb Jan 17, 2025
81581e3
wip......
aaxelb Jan 17, 2025
63d7c59
wip.......
aaxelb Jan 17, 2025
4e6ecd3
wip.........
aaxelb Jan 17, 2025
7223007
wip....
aaxelb Jan 17, 2025
b886653
wip... (fixes...)
aaxelb Jan 21, 2025
4bbcfbf
remove unnecessary migration
aaxelb Jan 21, 2025
a9a9b70
wip... (fix valuesearch)
aaxelb Jan 21, 2025
cd36203
wip.....
aaxelb Jan 21, 2025
332434a
remove temp TODO
aaxelb Jan 23, 2025
9a6c904
tidy names
aaxelb Jan 23, 2025
4e52c47
respond to group review
aaxelb Jan 24, 2025
7f17b1e
fix: respect small page[size] while streaming
aaxelb Jan 28, 2025
7c8396b
improve hacks around gathering
aaxelb Jan 28, 2025
2d267eb
more legible 'simple' json
aaxelb Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ Multiple records which describe the same item/object are grouped by a
the source repository. In most outward-facing views, default to showing only
the most recent record for each suid.

### Conventions
(an incomplete list)

- functions prefixed `pls_` ("please") are a request for something to happen

## Why this?
inspired by [this writeup](https://matklad.github.io/2021/02/06/ARCHITECTURE.md.html)
and [this example architecture document](https://github.com/rust-analyzer/rust-analyzer/blob/d7c99931d05e3723d878bea5dc26766791fa4e69/docs/dev/architecture.md)
6 changes: 3 additions & 3 deletions api/search/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ def post(self, request):

def _handle_request(self, request):
queryparams = request.query_params.dict()
requested_index_strategy = queryparams.pop('indexStrategy', None)
requested_index_strategy = queryparams.get('indexStrategy', None)
if 'scroll' in queryparams:
return http.HttpResponseForbidden(reason='Scroll is not supported.')
try:
specific_index = index_strategy.get_index_for_sharev2_search(requested_index_strategy)
_index_strategy = index_strategy.get_strategy_for_sharev2_search(requested_index_strategy)
except exceptions.IndexStrategyError as error:
raise http.Http404(str(error))
try:
response_json = specific_index.pls_handle_search__sharev2_backcompat(
response_json = _index_strategy.pls_handle_search__passthru(
request_body=request.data,
request_queryparams=queryparams,
)
Expand Down
6 changes: 3 additions & 3 deletions api/views/feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MetadataRecordsRSS(Feed):
description = 'Updates to the SHARE open dataset'
author_name = 'SHARE'

_search_index: index_strategy.IndexStrategy.SpecificIndex
_search_strategy: index_strategy.IndexStrategy

def title(self, obj):
query = json.dumps(obj.get('query', 'All'))
Expand All @@ -43,7 +43,7 @@ def title(self, obj):
def get_object(self, request):
self._order = request.GET.get('order')
elastic_query = request.GET.get('elasticQuery')
self._search_index = index_strategy.get_index_for_sharev2_search(request.GET.get('indexStrategy'))
self._search_strategy = index_strategy.get_strategy_for_sharev2_search(request.GET.get('indexStrategy'))

if self._order not in {'date_modified', 'date_updated', 'date_created', 'date_published'}:
self._order = 'date_modified'
Expand All @@ -64,7 +64,7 @@ def get_object(self, request):

def items(self, obj):
try:
json_response = self._search_index.pls_handle_search__sharev2_backcompat(
json_response = self._search_strategy.pls_handle_search__passthru(
request_body=obj,
)
except IndexStrategyError:
Expand Down
6 changes: 3 additions & 3 deletions share/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,15 @@ class FormattedMetadataRecordAdmin(admin.ModelAdmin):
class IndexBackfillAdmin(admin.ModelAdmin):
readonly_fields = (
'index_strategy_name',
'specific_indexname',
'strategy_checksum',
'error_type',
'error_message',
'error_context',
)
paginator = TimeLimitedPaginator
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'specific_indexname')
list_display = ('index_strategy_name', 'backfill_status', 'created', 'modified', 'strategy_checksum')
show_full_result_count = False
search_fields = ('index_strategy_name', 'specific_indexname',)
search_fields = ('index_strategy_name', 'strategy_checksum',)
actions = ('reset_to_initial',)

def reset_to_initial(self, request, queryset):
Expand Down
91 changes: 43 additions & 48 deletions share/admin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from share.admin.util import admin_url
from share.models.index_backfill import IndexBackfill
from share.search.index_messenger import IndexMessenger
from share.search import index_strategy
from share.search.index_strategy import (
IndexStrategy,
all_strategy_names,
each_strategy,
parse_strategy_name,
parse_specific_index_name,
)


logger = logging.getLogger(__name__)
Expand All @@ -25,19 +31,15 @@ def search_indexes_view(request):
},
)
if request.method == 'POST':
_specific_index = index_strategy.get_specific_index(request.POST['specific_indexname'])
_index_strategy = parse_strategy_name(request.POST['strategy_name'])
_pls_doer = PLS_DOERS[request.POST['pls_do']]
_pls_doer(_specific_index)
_redirect_id = (
_specific_index.index_strategy.name
if _pls_doer is _pls_delete
else _specific_index.indexname
)
_pls_doer(_index_strategy)
_redirect_id = _index_strategy.strategy_name
return HttpResponseRedirect('#'.join((request.path, _redirect_id)))


def search_index_mappings_view(request, index_name):
_specific_index = index_strategy.get_specific_index(index_name)
_specific_index = parse_specific_index_name(index_name)
_mappings = _specific_index.pls_get_mappings()
return JsonResponse(_mappings)

Expand All @@ -52,30 +54,23 @@ def _mappings_url_prefix():


def _index_status_by_strategy():
backfill_by_indexname: dict[str, IndexBackfill] = {
backfill.specific_indexname: backfill
for backfill in (
_backfill_by_checksum: dict[str, IndexBackfill] = {
_backfill.strategy_checksum: _backfill
for _backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
.filter(index_strategy_name__in=all_strategy_names())
)
}
status_by_strategy = {}
_messenger = IndexMessenger()
for _index_strategy in index_strategy.all_index_strategies().values():
current_index = _index_strategy.for_current_index()
status_by_strategy[_index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'backfill': _serialize_backfill(
current_index,
backfill_by_indexname.get(current_index.indexname),
),
},
'prior': sorted((
specific_index.pls_get_status()
for specific_index in _index_strategy.each_specific_index()
if not specific_index.is_current
), reverse=True),
for _index_strategy in each_strategy():
_current_backfill = (
_backfill_by_checksum.get(str(_index_strategy.CURRENT_STRATEGY_CHECKSUM))
or _backfill_by_checksum.get(_index_strategy.indexname_prefix) # backcompat
)
status_by_strategy[_index_strategy.strategy_name] = {
'status': _index_strategy.pls_get_strategy_status(),
'backfill': _serialize_backfill(_index_strategy, _current_backfill),
'queues': [
{
'name': _queue_name,
Expand All @@ -91,14 +86,14 @@ def _index_status_by_strategy():


def _serialize_backfill(
specific_index: index_strategy.IndexStrategy.SpecificIndex,
strategy: IndexStrategy,
backfill: IndexBackfill | None,
):
if not specific_index.is_current:
if not strategy.is_current:
return {}
if not backfill:
return {
'can_start_backfill': specific_index.pls_check_exists(),
'can_start_backfill': strategy.pls_check_exists(),
}
return {
'backfill_status': backfill.backfill_status,
Expand All @@ -109,35 +104,35 @@ def _serialize_backfill(
}


def _pls_setup(specific_index):
assert specific_index.is_current
specific_index.pls_setup()
def _pls_setup(index_strategy: IndexStrategy):
assert index_strategy.is_current
index_strategy.pls_setup()


def _pls_start_keeping_live(specific_index):
specific_index.pls_start_keeping_live()
def _pls_start_keeping_live(index_strategy: IndexStrategy):
index_strategy.pls_start_keeping_live()


def _pls_stop_keeping_live(specific_index):
specific_index.pls_stop_keeping_live()
def _pls_stop_keeping_live(index_strategy: IndexStrategy):
index_strategy.pls_stop_keeping_live()


def _pls_start_backfill(specific_index):
assert specific_index.is_current
specific_index.index_strategy.pls_start_backfill()
def _pls_start_backfill(index_strategy: IndexStrategy):
assert index_strategy.is_current
index_strategy.pls_start_backfill()


def _pls_mark_backfill_complete(specific_index):
specific_index.index_strategy.pls_mark_backfill_complete()
def _pls_mark_backfill_complete(index_strategy: IndexStrategy):
index_strategy.pls_mark_backfill_complete()


def _pls_make_default_for_searching(specific_index):
specific_index.index_strategy.pls_make_default_for_searching(specific_index)
def _pls_make_default_for_searching(index_strategy: IndexStrategy):
index_strategy.pls_make_default_for_searching()


def _pls_delete(specific_index):
assert not specific_index.is_current
specific_index.pls_delete()
def _pls_delete(index_strategy: IndexStrategy):
assert not index_strategy.is_current
index_strategy.pls_teardown()


PLS_DOERS = {
Expand Down
29 changes: 10 additions & 19 deletions share/bin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ def search(args, argv):
@search.subcommand('Drop the Elasticsearch index')
def purge(args, argv):
"""
Usage: {0} search purge <index_names>...
Usage: {0} search purge <strategy_names>...
"""
for index_name in args['<index_names>']:
specific_index = index_strategy.get_specific_index(index_name)
specific_index.pls_delete()
for _strategy_name in args['<strategy_names>']:
_strategy = index_strategy.parse_strategy_name(_strategy_name)
_strategy.pls_teardown()


@search.subcommand('Create indicies and apply mappings')
Expand All @@ -41,25 +41,16 @@ def setup(args, argv):
"""
_is_initial = args.get('--initial')
if _is_initial:
_specific_indexes = [
_index_strategy.for_current_index()
for _index_strategy in index_strategy.all_index_strategies().values()
]
for _index_strategy in index_strategy.each_strategy():
_index_strategy.pls_setup()
else:
_index_or_strategy_name = args['<index_or_strategy_name>']
try:
_specific_indexes = [index_strategy.get_specific_index(_index_or_strategy_name)]
_strategy = index_strategy.get_strategy(_index_or_strategy_name)
except IndexStrategyError:
try:
_specific_indexes = [
index_strategy.get_specific_index(_index_or_strategy_name),
]
except IndexStrategyError:
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
for _specific_index in _specific_indexes:
_specific_index.pls_setup(
skip_backfill=_is_initial, # for initial setup, there's nothing back to fill
)
raise IndexStrategyError(f'unrecognized index or strategy name "{_index_or_strategy_name}"')
else:
_strategy.pls_setup()


@search.subcommand('Start the search indexing daemon')
Expand Down
2 changes: 1 addition & 1 deletion share/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ def check_all_index_strategies_current(app_configs, **kwargs):
from share.search import index_strategy
from share.search.exceptions import IndexStrategyError
errors = []
for _index_strategy in index_strategy.all_index_strategies().values():
for _index_strategy in index_strategy.each_strategy():
try:
_index_strategy.assert_strategy_is_current()
except IndexStrategyError as exception:
Expand Down
18 changes: 14 additions & 4 deletions share/models/index_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ def __repr__(self):
def __str__(self):
return repr(self)

@property
def strategy_checksum(self):
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
return self.specific_indexname # for backcompat

@strategy_checksum.setter
def strategy_checksum(self, value):
# back-compat alias for specific_indexname (may be removed if that's renamed via migration)
self.specific_indexname = value

@contextlib.contextmanager
def mutex(self):
with IndexBackfill.objects.get_with_mutex(pk=self.pk) as index_backfill:
Expand All @@ -76,14 +86,14 @@ def mutex(self):

def pls_start(self, index_strategy):
with self.mutex() as locked_self:
assert locked_self.index_strategy_name == index_strategy.name
current_index = index_strategy.for_current_index()
if locked_self.specific_indexname == current_index.indexname:
assert locked_self.index_strategy_name == index_strategy.strategy_name
_current_checksum = str(index_strategy.CURRENT_STRATEGY_CHECKSUM)
if locked_self.strategy_checksum == _current_checksum:
# what is "current" has not changed -- should be INITIAL
assert locked_self.backfill_status == IndexBackfill.INITIAL
else:
# what is "current" has changed! disregard backfill_status
locked_self.specific_indexname = current_index.indexname
locked_self.strategy_checksum = _current_checksum
locked_self.backfill_status = IndexBackfill.INITIAL
locked_self.__update_error(None)
try:
Expand Down
13 changes: 7 additions & 6 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def start_daemonthreads_for_strategy(self, index_strategy):
return _daemon

def start_all_daemonthreads(self):
for _index_strategy in index_strategy.all_index_strategies().values():
for _index_strategy in index_strategy.each_strategy():
self.start_daemonthreads_for_strategy(_index_strategy)

def stop_daemonthreads(self, *, wait=False):
Expand Down Expand Up @@ -119,7 +119,7 @@ def get_consumers(self, Consumer, channel):
]

def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.name)
return '<{}({})>'.format(self.__class__.__name__, self.__index_strategy.strategy_name)

def consume(self, *args, **kwargs):
# wrap `consume` in `kombu.Connection.ensure`, following guidance from
Expand Down Expand Up @@ -191,7 +191,7 @@ def on_message(self, body, message):
continue

def __repr__(self):
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.name)
return '<{}({})>'.format(self.__class__.__name__, self.index_strategy.strategy_name)


@dataclasses.dataclass
Expand Down Expand Up @@ -232,11 +232,12 @@ def _the_loop_itself(self):
def _raise_if_backfill_noncurrent(self):
if self.message_type.is_backfill:
index_backfill = self.index_strategy.get_or_create_backfill()
if index_backfill.specific_indexname != self.index_strategy.current_indexname:
_current_checksum = str(self.index_strategy.CURRENT_STRATEGY_CHECKSUM)
if index_backfill.strategy_checksum != _current_checksum:
raise exceptions.DaemonSetupError(
'IndexerDaemon observes conflicting currence:'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.specific_indexname}"'
f'\n\tIndexStrategy (from static code) says current is "{self.index_strategy.current_indexname}"'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.strategy_checksum}"'
f'\n\tIndexStrategy (from static code) says current is "{_current_checksum}"'
'\n\t(may be the daemon is running old code -- will die and retry,'
' but if this keeps happening you may need to reset backfill_status'
' to INITIAL and restart the backfill)'
Expand Down
2 changes: 1 addition & 1 deletion share/search/index_messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, *, celery_app=None, index_strategys=None):
if celery_app is None
else celery_app
)
self.index_strategys = index_strategys or tuple(index_strategy.all_index_strategies().values())
self.index_strategys = index_strategys or tuple(index_strategy.each_strategy())

def notify_indexcard_update(self, indexcards, *, urgent=False):
self.send_messages_chunk(
Expand Down
Loading