Skip to content

Commit b2b3369

Browse files
committed
[A] Workaround: Manifest content hash computation times out (#6123)
1 parent 8bfb401 commit b2b3369

File tree

10 files changed

+137
-34
lines changed

10 files changed

+137
-34
lines changed

environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -943,5 +943,9 @@ def env() -> Mapping[str, Optional[str]]:
943943
# $1 per one million requests above ten million requests. The blocking
944944
# only applies to URLs disallowed via robots.txt.
945945
#
946-
'azul_waf_bot_control': '0'
946+
'azul_waf_bot_control': '0',
947+
948+
# Whether to enable bundle notifications for incremental index changes
949+
#
950+
'AZUL_ENABLE_BUNDLE_NOTIFICATIONS': '0'
947951
}

scripts/generate_openapi_document.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ def main():
5252
patch_config(f'{lambda_name}_function_name', f'azul-{lambda_name}-dev'),
5353
patch_config('enable_log_forwarding', False),
5454
patch_config('enable_replicas', True),
55-
patch_config('monitoring_email', '[email protected]')
55+
patch_config('monitoring_email', '[email protected]'),
56+
patch_config('enable_bundle_notifications', True)
5657
):
5758
lambda_endpoint = furl('http://localhost')
5859
with patch.object(target=AzulChaliceApp,

scripts/reindex.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,22 @@
4545
parser.add_argument('--local',
4646
default=False,
4747
action='store_true',
48-
help='Do not offload the listing of subgraphs to the indexer Lambda function. When this option is '
49-
'used, this script queries the repository without partitioning, and the indexer notification '
50-
'endpoint is invoked for each subgraph individually and concurrently using worker threads. '
51-
'This is magnitudes slower than remote (i.e. partitioned) indexing. If this option is not '
52-
'used (the default), the set of subgraphs matching the query is partitioned using the '
53-
'partition prefix length configured for each of the catalog sources being reindexed. Each '
54-
'query partition is processed independently and remotely by the indexer lambda. The index '
55-
'Lambda function queries the repository for each partition and queues a notification for each '
56-
'matching subgraph in the partition.')
48+
help=(
49+
'' if config.enable_bundle_notifications else '**DISABLED** '
50+
) + (
51+
'Do not offload the listing of subgraphs to the indexer Lambda function. When '
52+
'this option is used, this script queries the repository without partitioning, '
53+
'and the indexer notification endpoint is invoked for each subgraph '
54+
'individually and concurrently using worker threads. This is magnitudes slower '
55+
'than remote (i.e. partitioned) indexing. If this option is not used (the '
56+
'default), the set of subgraphs matching the query is partitioned using the '
57+
'partition prefix length configured for each of the catalog sources being '
58+
'reindexed. Each query partition is processed independently and remotely by '
59+
'the indexer lambda. The index Lambda function queries the repository for each '
60+
'partition and queues a notification for each matching subgraph in the '
61+
'partition.'
62+
)
63+
)
5764
parser.add_argument('--catalogs',
5865
nargs='+',
5966
metavar='NAME',
@@ -133,6 +140,10 @@ def main(argv: list[str]):
133140
deindex = args.deindex or (args.delete and not every_source)
134141
delete = args.delete and every_source
135142

143+
if args.local and not config.enable_bundle_notifications:
144+
parser.error('Local reindexing is not available while bundle '
145+
'notifications are disabled.')
146+
136147
if every_source:
137148
if deindex:
138149
parser.error('--deindex is incompatible with source `*`. Use --delete instead.')

src/azul/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,6 +1858,10 @@ def enable_mirroring(self) -> bool:
18581858
def mirror_bucket(self) -> str | None:
18591859
return self.environ.get('AZUL_MIRROR_BUCKET')
18601860

1861+
@property
1862+
def enable_bundle_notifications(self):
1863+
return self._boolean(self.environ['AZUL_ENABLE_BUNDLE_NOTIFICATIONS'])
1864+
18611865

18621866
config: Config = Config() # yes, the type hint does help PyCharm
18631867

src/azul/azulclient.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def handle_future(future: Future) -> None:
228228
log.error('Unsent notifications and their HTTP status code:\n%s',
229229
printer.pformat(missing))
230230
if errors or missing:
231-
raise AzulClientNotificationError
231+
raise AzulClientNotificationError(set(errors.keys()))
232232

233233
def matching_sources(self,
234234
catalogs: Iterable[CatalogName],
@@ -431,5 +431,5 @@ class AzulClientError(RuntimeError):
431431

432432
class AzulClientNotificationError(AzulClientError):
433433

434-
def __init__(self) -> None:
435-
super().__init__('Some notifications could not be sent')
434+
def __init__(self, *args, **kwargs) -> None:
435+
super().__init__('Some notifications could not be sent', *args, **kwargs)

src/azul/indexer/index_controller.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def handlers(self) -> dict[str, Any]:
7474
@self.app.route(
7575
'/{catalog}/{action}',
7676
methods=['POST'],
77+
enabled=config.enable_bundle_notifications,
7778
spec={
7879
'tags': ['Indexing'],
7980
'summary': 'Notify the indexer to perform an action on a bundle',
@@ -213,6 +214,8 @@ def handle_notification(self, catalog: CatalogName, action: str):
213214
raise R.propagate(e, chalice.BadRequestError)
214215
notification = request.json_body
215216
log.info('Received notification %r for catalog %r', notification, catalog)
217+
assert config.enable_bundle_notifications, R(
218+
'Bundle notifications are disabled')
216219
self._validate_notification(notification)
217220
service = self.index_queue_service
218221
message = service.index_bundle_message(self._load_action(action),

src/azul/service/manifest_service.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
from azul import (
7373
CatalogName,
7474
R,
75+
cache,
7576
cached_property,
7677
config,
7778
mutable_furl,
@@ -982,7 +983,9 @@ def manifest_key(self) -> ManifestKey:
982983
"""
983984
git_commit = config.lambda_git_status['commit']
984985
filter_string = repr(sort_frozen(freeze(self.filters.explicit)))
985-
content_hash = str(self.manifest_content_hash)
986+
# If incremental index changes are disabled, we don't need to worry
987+
# about individual bundles, only sources.
988+
content_hash = str(self.manifest_hash(config.enable_bundle_notifications))
986989
catalog = self.catalog
987990
format = self.format()
988991
manifest_hash_input = [
@@ -1032,9 +1035,11 @@ def file_name(self,
10321035
file_name = atlas + '-manifest-' + self.s3_object_key_base(manifest_key)
10331036
return file_name
10341037

1035-
def _create_request(self) -> Search:
1038+
def _create_request(self, entity_type: str | None = None) -> Search:
1039+
if entity_type is None:
1040+
entity_type = self.entity_type
10361041
pipeline = self._create_pipeline()
1037-
request = self.service.create_request(self.catalog, self.entity_type)
1042+
request = self.service.create_request(self.catalog, entity_type)
10381043
request = pipeline.prepare_request(request)
10391044
# The response is processed by the generator, not the pipeline
10401045
return request
@@ -1144,22 +1149,58 @@ def _azul_file_url(self,
11441149
fetch=False,
11451150
**args))
11461151

1147-
@cached_property
1148-
def manifest_content_hash(self) -> int:
1149-
log.debug('Computing content hash for manifest using filters %r ...', self.filters)
1152+
@cache
1153+
def manifest_hash(self, by_bundle: bool) -> int:
1154+
"""
1155+
Return a content hash for the manifest.
1156+
1157+
If `by_bundle` is True, the hash is computed from the fully-qualified
1158+
identifiers of all bundles containing files that match the current
1159+
filter. The return value approximates a hash of the content of the
1160+
manifest because a change of the file data requires a change to the file
1161+
metadata which requires a new bundle or bundle version.
1162+
1163+
If `by_bundle` is False, the hash is computed from the identifiers of
1164+
the sources from which projects/datasets containing files matching the
1165+
current filters were indexed. It's worth noting that a filter may match
1166+
a project/dataset but none of the project's files. For example, if a
1167+
project contains only files derived from either mouse brains or lion
1168+
hearts, the project will match the filter `species=lion and
1169+
organ=brain`, but none of its files will. If such a project/dataset is
1170+
added/removed to/from the index, the manifest hash returned for a given
1171+
filter will be different even though the contents of the manifest hasn't
1172+
changed, as no matching files were added or removed.
1173+
1174+
So while the hash computed from the sources is less sensitive than the
1175+
one computed from the bundles, it can be computed much more quickly.
1176+
"""
1177+
log.debug('Computing content hash for manifest from %s using %r ...',
1178+
'bundles' if by_bundle else 'sources', self.filters)
11501179
start_time = time.time()
1151-
request = self._create_request()
1180+
if by_bundle:
1181+
request = self._create_request()
1182+
else:
1183+
root_entity_type = self.metadata_plugin.root_entity_type
1184+
request = self._create_request(entity_type=root_entity_type)
11521185
request.aggs.metric(
11531186
'hash',
11541187
'scripted_metric',
11551188
init_script='''
11561189
state.fields = 0
11571190
''',
1158-
map_script='''
1159-
for (bundle in params._source.bundles) {
1160-
state.fields += (bundle.uuid + bundle.version).hashCode()
1161-
}
1162-
''',
1191+
map_script=(
1192+
'''
1193+
for (bundle in params._source.bundles) {
1194+
state.fields += (bundle.uuid + bundle.version).hashCode()
1195+
}
1196+
'''
1197+
if by_bundle else
1198+
'''
1199+
for (source in params._source.sources) {
1200+
state.fields += source.id.hashCode()
1201+
}
1202+
'''
1203+
),
11631204
combine_script='''
11641205
return state.fields.hashCode()
11651206
''',

test/azul_test_case.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ def setUpClass(cls) -> None:
242242
cls._patch_dss_query_prefix()
243243
cls._patch_lambda_env()
244244
cls._patch_valid_schema_domains()
245+
cls._patch_enable_bundle_notifications()
245246

246247
def setUp(self) -> None:
247248
super().setUp()
@@ -344,6 +345,13 @@ def _patch_valid_schema_domains(cls):
344345
attribute='valid_schema_domains',
345346
new=cls.valid_schema_domains))
346347

348+
@classmethod
349+
def _patch_enable_bundle_notifications(cls):
350+
cls.addClassPatch(patch.object(target=type(config),
351+
attribute='enable_bundle_notifications',
352+
new_callable=PropertyMock,
353+
return_value=True))
354+
347355

348356
class CatalogTestCase(AzulUnitTestCase, metaclass=ABCMeta):
349357
catalog: CatalogName = 'test'

test/integration_test.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1746,10 +1746,12 @@ class AzulClientIntegrationTest(IntegrationTestCase):
17461746
def test_azul_client_error_handling(self):
17471747
invalid_notification = {}
17481748
notifications = [invalid_notification]
1749-
self.assertRaises(AzulClientNotificationError,
1750-
self.azul_client.index,
1751-
first(config.integration_test_catalogs),
1752-
notifications)
1749+
with self.assertRaises(AzulClientNotificationError) as cm:
1750+
self.azul_client.index(catalog=first(config.integration_test_catalogs),
1751+
notifications=notifications)
1752+
self.assertEqual('Some notifications could not be sent', cm.exception.args[0])
1753+
expected = 400 if config.enable_bundle_notifications else 403
1754+
self.assertEqual({expected}, cm.exception.args[1])
17531755

17541756

17551757
class OpenAPIIntegrationTest(AzulTestCase):

test/service/test_manifest.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -902,37 +902,57 @@ def test_manifest_filter_validation(self):
902902
response = requests.put(str(url))
903903
self.assertEqual(400, response.status_code, response.content)
904904

905-
def test_manifest_content_disposition_header(self):
905+
@patch.object(type(config),
906+
'enable_bundle_notifications',
907+
new=PropertyMock(return_value=True))
908+
def test_content_disposition_header_with_notifications_enabled(self) -> None:
909+
self._test_content_disposition_header()
910+
911+
@patch.object(type(config),
912+
'enable_bundle_notifications',
913+
new=PropertyMock(return_value=False))
914+
def test_content_disposition_header_with_notifications_disabled(self) -> None:
915+
self._test_content_disposition_header()
916+
917+
def _test_content_disposition_header(self):
906918
bundle_fqid = self.bundle_fqid(uuid='f79257a7-dfc6-46d6-ae00-ba4b25313c10',
907919
version='2018-09-14T13:33:14.453337Z')
908920
self._index_canned_bundle(bundle_fqid)
909921
with patch.object(manifest_service, 'datetime') as mock_datetime:
910922
mock_datetime.now.return_value = datetime(1985, 10, 25, 1, 21)
911923
for format in [ManifestFormat.compact]:
912-
for filters, expected_name in [
924+
source_hash = '4bc67e84-4873-591f-b524-a5fe4ec215eb'
925+
for filters, name_with_notif_enabled, name_with_notif_disabled in [
913926
# For a single project, the content disposition file name should
914927
# be the project name followed by the date and time
915928
(
916929
{'project': {'is': ['Single of human pancreas']}},
930+
'Single of human pancreas 1985-10-25 01.21',
917931
'Single of human pancreas 1985-10-25 01.21'
918932
),
919933
# In all other cases, the standard content disposition file name
920934
# should be "hca-manifest-" followed by the manifest key,
921935
# a pair of deterministically derived v5 UUIDs.
922936
(
923937
{'project': {'is': ['Single of human pancreas', 'Mouse Melanoma']}},
924-
'hca-manifest-20d97863-d8cf-54f3-8575-0f9593d3d7ef.4bc67e84-4873-591f-b524-a5fe4ec215eb'
938+
'hca-manifest-20d97863-d8cf-54f3-8575-0f9593d3d7ef.' + source_hash,
939+
'hca-manifest-9f00706c-aa4f-5ae3-80ee-5328c80b6fce.' + source_hash
925940
),
926941
(
927942
{},
928-
'hca-manifest-c3cf398e-1927-5aae-ba2a-81d8d1800b2d.4bc67e84-4873-591f-b524-a5fe4ec215eb'
943+
'hca-manifest-c3cf398e-1927-5aae-ba2a-81d8d1800b2d.' + source_hash,
944+
'hca-manifest-f2adc097-6300-5d33-9fa4-34d97d9bb39f.' + source_hash
929945
)
930946
]:
931947
with self.subTest(filters=filters, format=format):
932948
manifest, num_partitions = self._get_manifest_object(format, filters)
933949
self.assertFalse(manifest.was_cached)
934950
self.assertEqual(1, num_partitions)
935951
url = furl(self._service.get_manifest_url(manifest))
952+
if config.enable_bundle_notifications:
953+
expected_name = name_with_notif_enabled
954+
else:
955+
expected_name = name_with_notif_disabled
936956
expected_cd = f'attachment;filename="{expected_name}.tsv"'
937957
actual_cd = url.args['response-content-disposition']
938958
self.assertEqual(expected_cd, actual_cd)
@@ -1057,9 +1077,18 @@ def test_compact_metadata_cache(self, _time_until_object_expires: MagicMock):
10571077
self.assertEqual([2, 2], list(map(len, file_names.values())))
10581078
self.assertEqual([1, 1], list(map(len, map(set, file_names.values()))))
10591079

1080+
@patch.object(type(config),
1081+
'enable_bundle_notifications',
1082+
new=PropertyMock(return_value=True))
10601083
def test_hash_validity_with_notifications_enabled(self) -> None:
10611084
self._test_hash_validity()
10621085

1086+
@patch.object(type(config),
1087+
'enable_bundle_notifications',
1088+
new=PropertyMock(return_value=False))
1089+
def test_hash_validity_with_notifications_disabled(self) -> None:
1090+
self._test_hash_validity()
1091+
10631092
def _test_hash_validity(self):
10641093
self.maxDiff = None
10651094
bundles_by_project = {

0 commit comments

Comments
 (0)