From 74976e2f6aff8d6e6baf8c624649302dd432e1cc Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Wed, 23 Jul 2025 17:58:01 -0700 Subject: [PATCH 01/14] Assert mirroring is enabled when handling messages --- src/azul/indexer/mirror_controller.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/azul/indexer/mirror_controller.py b/src/azul/indexer/mirror_controller.py index b99736a076..0cc44d905c 100644 --- a/src/azul/indexer/mirror_controller.py +++ b/src/azul/indexer/mirror_controller.py @@ -101,6 +101,7 @@ def mirror(event: chalice.app.SQSEvent): return super().handlers() | locals() def mirror(self, event: Iterable[SQSRecord]): + assert config.enable_mirroring, R('Mirroring is disabled') self._handle_events(event, self._mirror) def _mirror(self, action: MirrorAction, message: JSON): From a97eb69944304ba61ffadb4d5192342037c6e1e9 Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Wed, 23 Jul 2025 17:45:34 -0700 Subject: [PATCH 02/14] Assert mirrored sources are public --- src/azul/indexer/mirror_controller.py | 2 ++ test/indexer/test_mirror_controller.py | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/src/azul/indexer/mirror_controller.py b/src/azul/indexer/mirror_controller.py index 0cc44d905c..3d1c3a98df 100644 --- a/src/azul/indexer/mirror_controller.py +++ b/src/azul/indexer/mirror_controller.py @@ -134,6 +134,8 @@ def _mirror(self, action: MirrorAction, message: JSON): def mirror_source(self, catalog: CatalogName, source_json: JSON): plugin = self.repository_plugin(catalog) source = plugin.source_ref_cls.from_json(source_json) + assert source.id in plugin.list_source_ids(authentication=None), R( + 'Cannot mirror non-public source', source) # The desired partition size depends on the maximum number of messages # we can send in one Lambda invocation, because queueing the individual # mirror_file messages turns out to dominate the running time of diff --git a/test/indexer/test_mirror_controller.py b/test/indexer/test_mirror_controller.py index 216afb61f5..399901240d 100644 --- a/test/indexer/test_mirror_controller.py +++ b/test/indexer/test_mirror_controller.py @@ -39,6 +39,9 @@ from azul.plugins.metadata.hca import ( HCAFile, ) +from azul.plugins.repository.tdr import ( + TDRPlugin, +) from azul.types import ( JSON, ) @@ -65,6 +68,17 @@ class TestMirrorController(DCP2TestCase, WorkQueueTestCase, MirrorTestCase): + @classmethod + def _patch_list_source_ids(cls): + cls.addClassPatch(patch.object(TDRPlugin, + 'list_source_ids', + return_value={cls.source.id})) + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._patch_list_source_ids() + @classmethod def lambda_name(cls) -> str: return 'indexer' From b42546af1c7f1b6ca424cc715a41d9e522de3f22 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 28 Oct 2025 22:09:52 -0700 Subject: [PATCH 03/14] Move file size check from mirror_file to mirror_partition --- src/azul/indexer/mirror_controller.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/azul/indexer/mirror_controller.py b/src/azul/indexer/mirror_controller.py index 3d1c3a98df..bd7802b9d7 100644 --- a/src/azul/indexer/mirror_controller.py +++ b/src/azul/indexer/mirror_controller.py @@ -169,10 +169,19 @@ def mirror_partition(self, source = plugin.source_ref_cls.from_json(source_json) files = plugin.list_files(source, prefix) + deployment_is_stable = (config.deployment.is_stable + and not config.deployment.is_unit_test + and catalog not in config.integration_test_catalogs) + def messages() -> Iterable[SQSMessage]: for file in files: - log.debug('Queueing file %r', file) - yield self.mirror_file_message(catalog, source, file) + assert file.size is not None, R('File size unknown', file) + file_is_large = file.size > 1.5 * 1024 ** 3 + if file_is_large and not deployment_is_stable: + log.info('Not mirroring file to save cost: %r', file) + else: + log.debug('Queueing file %r', file) + yield self.mirror_file_message(catalog, source, file) self.client.queue_mirror_messages(messages()) log.info('Queued %d files in partition %r of source %r in catalog %r', @@ -184,16 +193,8 @@ def mirror_file(self, ): file = self.load_file(catalog, file_json) assert file.size is not None, R('File size unknown', file) - - file_is_large = file.size > 1.5 * 1024 ** 3 - deployment_is_stable = (config.deployment.is_stable - and not config.deployment.is_unit_test - and catalog not in config.integration_test_catalogs) - service = self.service(catalog) - if file_is_large and not deployment_is_stable: - log.info('Not mirroring file to save cost: %r', file) - elif service.info_exists(file): + if service.info_exists(file): log.info('File is already mirrored, skipping upload: %r', file) elif service.file_exists(file): assert False, R('File object is already present', file) From ad637704274ea04e19ea67326e1afd14e7cb4213 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 28 Oct 2025 23:43:03 -0700 Subject: [PATCH 04/14] Refactor mirror controller test --- test/indexer/test_mirror_controller.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/test/indexer/test_mirror_controller.py b/test/indexer/test_mirror_controller.py index 399901240d..ca6895a44d 100644 --- a/test/indexer/test_mirror_controller.py +++ b/test/indexer/test_mirror_controller.py @@ -85,6 +85,7 @@ def lambda_name(cls) -> str: def test_mirroring(self): self._create_mock_queues(config.mirror_queue_names) + file = self._file with self.subTest('remote_mirror'): source_message = self._test_remote_mirror() @@ -92,7 +93,7 @@ def test_mirroring(self): partition_message = self._test_mirror_source(source_message) with self.subTest('mirror_partition'): - file, file_message = self._test_mirror_partition(partition_message) + file_message = self._test_mirror_partition(partition_message, [file]) with self.subTest('mirror_file', corrupted=False, exists=False): self._test_mirror_file(file, file_message) @@ -109,6 +110,14 @@ def test_mirroring(self): _file_contents = b'lorem ipsum dolor sit\n' + _file = HCAFile(uuid='405852c9-a0cc-4cd8-b9ff-7c6296223661', + name='foo.txt', + version=None, + drs_uri='drs://fake-domain.lan/foo', + size=len(_file_contents), + content_type='text/plain', + sha256=hashlib.sha256(_file_contents).hexdigest()) + @property def mirror_controller(self) -> MirrorController: return self.app_module.app.mirror_controller @@ -140,25 +149,18 @@ def _test_mirror_source(self, source_message): self.assertEqual(list(self.source.prefix.partition_prefixes()), partitions) return partition_message - def _test_mirror_partition(self, partition_message): + def _test_mirror_partition(self, partition_message, files: list[HCAFile]): event = self._mirror_event(partition_message) - file = HCAFile(uuid='405852c9-a0cc-4cd8-b9ff-7c6296223661', - name='foo.txt', - version=None, - drs_uri='drs://fake-domain.lan/foo', - size=len(self._file_contents), - content_type='text/plain', - sha256=hashlib.sha256(self._file_contents).hexdigest()) plugin_cls = type(self.client.repository_plugin(self.catalog)) - with patch.object(plugin_cls, 'list_files', return_value=[file]): + with patch.object(plugin_cls, 'list_files', return_value=files): self.mirror_controller.mirror(event) file_message = one(self._read_queue(self.client.mirror_queue())) expected_message = dict(action='mirror_file', catalog=self.catalog, source=self.source.to_json(), - file=file.to_json()) + file=self._file.to_json()) self.assertEqual(expected_message, file_message) - return file, file_message + return file_message def _test_mirror_file(self, file, file_message): event = self._mirror_event(file_message) From 59cfa83e0a39399d695d467b3829de5c854b6b31 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Wed, 29 Oct 2025 00:38:34 -0700 Subject: [PATCH 05/14] Refactor IT source selection --- test/integration_test.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/test/integration_test.py b/test/integration_test.py index d920330e39..8e5f47e4f3 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -311,8 +311,11 @@ def _select_source(self, sources. """ plugin = self.repository_plugin(catalog) - sources = set(plugin.sources) - if public is not None: + sources = plugin.sources + + if public is None: + ma_sources = set() + else: ma_sources = { source.spec # This would raise a KeyError during the can bundle script test @@ -321,12 +324,20 @@ def _select_source(self, for source in self.managed_access_sources_by_catalog[catalog] } self.assertIsSubset(ma_sources, sources) - if public is True: - sources -= ma_sources + + def _filter(source: SourceSpec) -> bool: + if public is None: + valid = True + elif public is True: + valid = source not in ma_sources elif public is False: - sources &= ma_sources + valid = source in ma_sources else: assert False, public + return valid + + sources = set(filter(_filter, sources)) + if len(sources) == 0: assert public is False, 'An IT catalog must contain at least one public source' return None From 029507d412209db24c6bae3bb14235fd8966afac Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Mon, 21 Jul 2025 20:17:14 -0700 Subject: [PATCH 06/14] [u 1/6] Use consistent value for pop flag (#7066) --- deployments/sandbox/environment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index dd63a1f974..d644150763 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -8,7 +8,7 @@ is_sandbox = True -pop = 2 # remove snapshot +pop = 1 # remove snapshot type ProjectName = str type SourceSpec = str From 04ac4db01078c45876e5ca855a7f7fb53ced6be5 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 14 Oct 2025 21:41:27 -0700 Subject: [PATCH 07/14] [u 2/6] Rename mklist to mksrcs (#7066) --- UPGRADING.rst | 2 +- deployments/anvilbox/environment.py | 4 ++-- deployments/anvildev/environment.py | 4 ++-- deployments/anvilprod/environment.py | 4 ++-- deployments/dev/environment.py | 4 ++-- deployments/hammerbox/environment.py | 4 ++-- deployments/prod/environment.py | 6 +++--- deployments/sandbox/environment.py | 4 ++-- deployments/tempdev/environment.py | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/UPGRADING.rst b/UPGRADING.rst index aac9382330..95bb334d11 100644 --- a/UPGRADING.rst +++ b/UPGRADING.rst @@ -43,7 +43,7 @@ Follow the steps above for all shared deployments. In your personal deployments' ``environment.py`` files: -1. Update the type annotations for ``bqsrc``, ``mksrc``, ``mkdelta``, ``mklist``, +1. Update the type annotations for ``bqsrc``, ``mksrc``, ``mkdelta``, ``mksrcs``, ``mkdict``, and ``env``. 2. Remove the ``prefix`` parameter and its uses from ``bqsrc`` and ``mksrc``. diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index fa37efcbdf..8d620af7c6 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -50,7 +50,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -59,7 +59,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index 2e99c406a4..30979b719a 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -48,7 +48,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -57,7 +57,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py index a082739490..8209330615 100644 --- a/deployments/anvilprod/environment.py +++ b/deployments/anvilprod/environment.py @@ -51,7 +51,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -60,7 +60,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index fc51b08ede..bc9a71ad24 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -37,7 +37,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -46,7 +46,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index b8faf4f6d7..b98a458679 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -53,7 +53,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -62,7 +62,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index 4edc0c5bdb..62cf820765 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -39,7 +39,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -48,7 +48,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog @@ -1853,7 +1853,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), - sources=mklist(sources)) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('hca', 'dcp54', dcp54_sources), ('lungmap', 'lm9', lm9_sources) diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index d644150763..cf55ba5a54 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -39,7 +39,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -48,7 +48,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog diff --git a/deployments/tempdev/environment.py b/deployments/tempdev/environment.py index 9c1a405575..c158a9061e 100644 --- a/deployments/tempdev/environment.py +++ b/deployments/tempdev/environment.py @@ -37,7 +37,7 @@ def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] return result -def mklist(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: +def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: return list(filter(None, catalog.values())) @@ -46,7 +46,7 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], delta: dict[ProjectName, SourceSpec | None], ) -> dict[ProjectName, SourceSpec | None]: catalog = previous_catalog | delta - num_actual = len(mklist(catalog)) + num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) return catalog From 6c93459efb9162ab18604fd5e37d3685159b46ee Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 14 Oct 2025 21:42:23 -0700 Subject: [PATCH 08/14] [u 3/6] Consistently use mksrcs function (#7066) --- deployments/anvilbox/environment.py | 2 +- deployments/anvildev/environment.py | 2 +- deployments/anvilprod/environment.py | 2 +- deployments/dev/environment.py | 2 +- deployments/hammerbox/environment.py | 2 +- deployments/sandbox/environment.py | 2 +- deployments/tempdev/environment.py | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index 8d620af7c6..2ac4f7c9c8 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -119,7 +119,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('anvil', 'anvil', anvil_sources), ] diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index 30979b719a..fd36ee71df 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -106,7 +106,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('anvil', 'anvil', anvil_sources), ] diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py index 8209330615..4f72feb7ee 100644 --- a/deployments/anvilprod/environment.py +++ b/deployments/anvilprod/environment.py @@ -1232,7 +1232,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('anvil', 'anvil11', anvil11_sources), ] diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index bc9a71ad24..7e8fd52a26 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -217,7 +217,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('hca', 'dcp3', dcp3_sources), ('lungmap', 'lungmap', lungmap_sources), diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index b98a458679..bf63897b20 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -1246,7 +1246,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('anvil', 'anvil11', anvil11_sources), ] diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index cf55ba5a54..b3e4ca6d69 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -230,7 +230,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('hca', 'dcp3', dcp3_sources), ('lungmap', 'lungmap', lungmap_sources), diff --git a/deployments/tempdev/environment.py b/deployments/tempdev/environment.py index c158a9061e..25c09085ea 100644 --- a/deployments/tempdev/environment.py +++ b/deployments/tempdev/environment.py @@ -97,7 +97,7 @@ def env() -> Mapping[str, str | None]: internal=internal, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), - sources=list(filter(None, sources.values()))) + sources=mksrcs(sources)) for atlas, catalog, sources in [ ('anvil', 'anvil', anvil_sources), ] From 1598e5f5019f85dcf84b379c6e140b6e2ee2ff97 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Tue, 14 Oct 2025 21:14:55 -0700 Subject: [PATCH 09/14] [u 4/6] Configure max mirrored file size per catalog (#7066) --- deployments/anvildev/environment.py | 1 + deployments/dev/environment.py | 10 ++++++++ deployments/hammerbox/environment.py | 1 + deployments/prod/environment.py | 10 ++++++++ deployments/sandbox/environment.py | 10 ++++++++ environment.py | 5 ++++ lambdas/service/openapi.json | 4 ++++ scripts/generate_openapi_document.py | 1 + src/azul/__init__.py | 5 ++++ src/azul/azulclient.py | 21 ++++++++++------- src/azul/indexer/mirror_controller.py | 8 ++----- src/azul/service/catalog_controller.py | 1 + test/azul_test_case.py | 3 +++ test/indexer/test_mirror_controller.py | 32 ++++++++++++++++++++++++-- test/integration_test.py | 3 ++- 15 files changed, 98 insertions(+), 17 deletions(-) diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index fd36ee71df..75773550ee 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -104,6 +104,7 @@ def env() -> Mapping[str, str | None]: 'AZUL_CATALOGS': json.dumps({ f'{catalog}{suffix}': dict(atlas=atlas, internal=internal, + mirror_max_file_size=1.5 * 1024 ** 3, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), sources=mksrcs(sources)) diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index 7e8fd52a26..e58012b3fa 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -180,6 +180,15 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], ])) +def mirror_max_file_size(atlas: str, internal: bool) -> int | None: + if atlas == 'hca': + return 1.5 * 1024 ** 3 + elif atlas == 'lungmap': + return -1 + else: + assert False, atlas + + def env() -> Mapping[str, str | None]: """ Returns a dictionary that maps environment variable names to values. The @@ -215,6 +224,7 @@ def env() -> Mapping[str, str | None]: 'AZUL_CATALOGS': json.dumps({ f'{catalog}{suffix}': dict(atlas=atlas, internal=internal, + mirror_max_file_size=mirror_max_file_size(atlas, internal), plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), sources=mksrcs(sources)) diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index bf63897b20..d6b142f682 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -1244,6 +1244,7 @@ def env() -> Mapping[str, str | None]: 'AZUL_CATALOGS': base64.b64encode(bz2.compress(json.dumps({ f'{catalog}{suffix}': dict(atlas=atlas, internal=internal, + mirror_max_file_size=1.5 * 1024 ** 3, plugins=dict(metadata=dict(name='anvil'), repository=dict(name='tdr_anvil')), sources=mksrcs(sources)) diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index 62cf820765..cfb8db4d5f 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -1817,6 +1817,15 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], ])) +def mirror_max_file_size(atlas: str, internal: bool) -> int | None: + if atlas == 'hca': + return 1.5 * 1024 ** 3 if internal else None + elif atlas == 'lungmap': + return -1 + else: + assert False, atlas + + def env() -> Mapping[str, str | None]: """ Returns a dictionary that maps environment variable names to values. The @@ -1851,6 +1860,7 @@ def env() -> Mapping[str, str | None]: 'AZUL_CATALOGS': base64.b64encode(bz2.compress(json.dumps({ f'{catalog}{suffix}': dict(atlas=atlas, internal=internal, + mirror_max_file_size=mirror_max_file_size(atlas, internal), plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), sources=mksrcs(sources)) diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index b3e4ca6d69..c5f2849d0e 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -182,6 +182,15 @@ def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], ])) +def mirror_max_file_size(atlas: str, internal: bool) -> int | None: + if atlas == 'hca': + return 1.5 * 1024 ** 3 + elif atlas == 'lungmap': + return -1 + else: + assert False, atlas + + def env() -> Mapping[str, str | None]: """ Returns a dictionary that maps environment variable names to values. The @@ -228,6 +237,7 @@ def env() -> Mapping[str, str | None]: 'AZUL_CATALOGS': json.dumps({ f'{catalog}{suffix}': dict(atlas=atlas, internal=internal, + mirror_max_file_size=mirror_max_file_size(atlas, internal), plugins=dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')), sources=mksrcs(sources)) diff --git a/environment.py b/environment.py index 113941ad82..ecc236ca82 100644 --- a/environment.py +++ b/environment.py @@ -44,6 +44,7 @@ def env() -> Mapping[str, str | None]: # 'name': { # 'atlas': 'bar', # 'internal': True, + # 'mirror_max_file_size': -1, # 'plugins': { # plugin_type: {'name'=plugin_package}, # plugin_type: {'name'=plugin_package}, @@ -65,6 +66,10 @@ def env() -> Mapping[str, str | None]: # `plugin_package` denotes the concrete implementation of how to fulfill # that purpose. # + # Files larger than `mirror_max_file_size` will not be mirrored for + # that catalog. This property may be absent, null, or negative, all of + # which will result in no files being mirrored for that catalog. + # # The first catalog listed is the default catalog. # # A source represents a TDR snapshot or canned staging area to index. diff --git a/lambdas/service/openapi.json b/lambdas/service/openapi.json index b40698ce98..0bdf349e7a 100644 --- a/lambdas/service/openapi.json +++ b/lambdas/service/openapi.json @@ -764,6 +764,10 @@ "internal": { "type": "boolean" }, + "mirror_max_file_size": { + "type": "integer", + "format": "int64" + }, "plugins": { "type": "object", "properties": {}, diff --git a/scripts/generate_openapi_document.py b/scripts/generate_openapi_document.py index bd7bd15285..6b714ee4ea 100644 --- a/scripts/generate_openapi_document.py +++ b/scripts/generate_openapi_document.py @@ -37,6 +37,7 @@ def main(): 'dcp2': config.Catalog(name='dcp2', atlas='hca', internal=False, + mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='hca'), repository=config.Catalog.Plugin(name='tdr_hca')), sources=set()) diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 435c8c7768..5cd29d1306 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -63,9 +63,11 @@ JSON, MutableJSON, json_bool, + json_int, json_mapping, json_sequence, json_str, + optional, ) from azul.vendored.frozendict import ( frozendict, @@ -881,6 +883,7 @@ class Catalog: Config.Catalog(name='dcp', atlas='hca', internal=False, + mirror_max_file_size=None, plugins={'metadata': Config.Catalog.Plugin(name='hca'), 'repository': Config.Catalog.Plugin(name='tdr_hca')}, sources=set()) @@ -916,6 +919,7 @@ def from_json(cls, spec: JSON) -> Self: name: str atlas: str internal: bool + mirror_max_file_size: int | None plugins: Mapping[str, Plugin] sources: Set[str] @@ -982,6 +986,7 @@ def from_json(cls, name: str, spec: JSON) -> Self: return cls(name=name, atlas=json_str(spec['atlas']), internal=json_bool(spec['internal']), + mirror_max_file_size=optional(json_int, spec.get('mirror_max_file_size')), plugins=plugins, sources=set(map(json_str, json_sequence(spec['sources'])))) diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index edef3893cb..89edaf02f6 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -389,14 +389,19 @@ def is_queue_empty(self, queue_name: str) -> bool: return length == 0 def remote_mirror(self, catalog: CatalogName, sources: Iterable[SourceRef]): - - def message(source: SourceRef): - log.info('Mirroring files in source %r from catalog %r', - str(source.spec), catalog) - return self.mirror_source_message(catalog, source) - - messages = map(message, sources) - self.queue_mirror_messages(messages) + max_file_size = config.catalogs[catalog].mirror_max_file_size + if max_file_size is not None and max_file_size < 0: + log.info('Not mirroring any files in catalog %r because the file ' + 'size limit is negative', catalog) + else: + + def message(source: SourceRef): + log.info('Mirroring files in source %r from catalog %r', + str(source.spec), catalog) + return self.mirror_source_message(catalog, source) + + messages = map(message, sources) + self.queue_mirror_messages(messages) def _get_non_empty_fail_queues(self) -> set[str]: return { diff --git a/src/azul/indexer/mirror_controller.py b/src/azul/indexer/mirror_controller.py index bd7802b9d7..02c38ed9be 100644 --- a/src/azul/indexer/mirror_controller.py +++ b/src/azul/indexer/mirror_controller.py @@ -168,16 +168,12 @@ def mirror_partition(self, plugin = self.repository_plugin(catalog) source = plugin.source_ref_cls.from_json(source_json) files = plugin.list_files(source, prefix) - - deployment_is_stable = (config.deployment.is_stable - and not config.deployment.is_unit_test - and catalog not in config.integration_test_catalogs) + max_size = config.catalogs[catalog].mirror_max_file_size def messages() -> Iterable[SQSMessage]: for file in files: assert file.size is not None, R('File size unknown', file) - file_is_large = file.size > 1.5 * 1024 ** 3 - if file_is_large and not deployment_is_stable: + if max_size is not None and file.size > max_size: log.info('Not mirroring file to save cost: %r', file) else: log.debug('Queueing file %r', file) diff --git a/src/azul/service/catalog_controller.py b/src/azul/service/catalog_controller.py index 6b985c5401..3616477d03 100644 --- a/src/azul/service/catalog_controller.py +++ b/src/azul/service/catalog_controller.py @@ -32,6 +32,7 @@ def list_catalogs(self) -> schema.object( additionalProperties=schema.object( atlas=str, internal=bool, + mirror_max_file_size=schema.optional(int), plugins=schema.object( additionalProperties=schema.object( name=str, diff --git a/test/azul_test_case.py b/test/azul_test_case.py index 06498636e3..a907f20afa 100644 --- a/test/azul_test_case.py +++ b/test/azul_test_case.py @@ -453,6 +453,7 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: cls.catalog: config.Catalog(name=cls.catalog, atlas='hca', internal=False, + mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='hca'), repository=config.Catalog.Plugin(name='dss')), sources={str(cls.source.spec)}) @@ -500,6 +501,7 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: cls.catalog: config.Catalog(name=cls.catalog, atlas='hca', internal=False, + mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='hca'), repository=config.Catalog.Plugin(name='tdr_hca')), sources=cls._sources()) @@ -517,6 +519,7 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: cls.catalog: config.Catalog(name=cls.catalog, atlas='anvil', internal=False, + mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='anvil'), repository=config.Catalog.Plugin(name='tdr_anvil')), sources={str(cls.source.spec)}) diff --git a/test/indexer/test_mirror_controller.py b/test/indexer/test_mirror_controller.py index ca6895a44d..971a75cc6e 100644 --- a/test/indexer/test_mirror_controller.py +++ b/test/indexer/test_mirror_controller.py @@ -5,6 +5,7 @@ patch, ) +import attrs from chalice.app import ( SQSRecord, ) @@ -44,6 +45,7 @@ ) from azul.types import ( JSON, + MutableJSONs, ) from azul_test_case import ( DCP2TestCase, @@ -125,9 +127,12 @@ def mirror_controller(self) -> MirrorController: def _mirror_event(self, body: JSON) -> list[SQSRecord]: return [self._mock_sqs_record(body, fifo=True)] - def _test_remote_mirror(self): + def _remote_mirror(self) -> MutableJSONs: self.client.remote_mirror(self.catalog, [self.source]) - source_message = one(self._read_queue(self.client.mirror_queue())) + return self._read_queue(self.client.mirror_queue()) + + def _test_remote_mirror(self): + source_message = one(self._remote_mirror()) expected_message = dict(action='mirror_source', catalog=self.catalog, source=self.source.to_json()) @@ -197,3 +202,26 @@ def test_info_schema(self): self.assertEqual(200, response.status, response.data) schema = json.loads(response.data) jsonschema.validate(info, schema) + + def test_files_not_mirrored(self): + self._create_mock_queues(config.mirror_queue_names) + catalog = config.catalogs[self.catalog] + + def patch_max_file_size(size): + return patch.dict(config.catalogs, { + self.catalog: attrs.evolve(catalog, mirror_max_file_size=size) + }) + + with self.subTest(mirror_max_file_size=-1): + with patch_max_file_size(-1): + messages = self._remote_mirror() + self.assertEqual([], messages) + + with self.subTest(mirror_max_file_size=self._file.size): + too_big = attrs.evolve(self._file, + uuid='2873c8ef-8f76-4ccf-add7-26afe8c62873', + size=self._file.size + 1) + source_message = self._test_remote_mirror() + partition_message = self._test_mirror_source(source_message) + with patch_max_file_size(self._file.size): + self._test_mirror_partition(partition_message, [too_big, self._file]) diff --git a/test/integration_test.py b/test/integration_test.py index 8e5f47e4f3..eb02d5e69c 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1716,7 +1716,7 @@ def _test_mirroring(self, *, delete: bool): catalogs = [ c.name for c in config.catalogs.values() - if c.is_integration_test_catalog and c.atlas == 'hca' + if c.is_integration_test_catalog and c.mirror_max_file_size >= 0 ] sources_by_catalog = { catalog: [self._select_source(catalog, public=True)] @@ -1888,6 +1888,7 @@ def test_can_bundle_canned_repository(self): mock_catalog = config.Catalog(name='canned-it', atlas='hca', internal=True, + mirror_max_file_size=None, plugins={ 'metadata': config.Catalog.Plugin(name='hca'), 'repository': config.Catalog.Plugin(name='canned'), From 401f5a25fdef1381bf7cba68bc823b4b33b6b2ba Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Fri, 25 Jul 2025 17:47:54 -0700 Subject: [PATCH 10/14] [u 5/6] Convert source config to dictionary (#7066) --- UPGRADING.rst | 5 +++ deployments/anvilbox/environment.py | 42 +++++++++++++++----------- deployments/anvildev/environment.py | 42 +++++++++++++++----------- deployments/anvilprod/environment.py | 42 +++++++++++++++----------- deployments/dev/environment.py | 38 +++++++++++++---------- deployments/hammerbox/environment.py | 42 +++++++++++++++----------- deployments/prod/environment.py | 38 +++++++++++++---------- deployments/sandbox/environment.py | 38 +++++++++++++---------- deployments/tempdev/environment.py | 38 +++++++++++++---------- environment.py | 8 ++--- scripts/mirror.py | 36 +++++++++++----------- scripts/post_deploy_tdr.py | 2 +- src/azul/__init__.py | 15 +++++---- src/azul/azulclient.py | 25 +++++++++------ src/azul/indexer/__init__.py | 8 +++++ src/azul/plugins/__init__.py | 9 ++++-- test/azul_test_case.py | 6 ++-- test/indexer/test_mirror_controller.py | 5 ++- test/integration_test.py | 29 ++++++++++-------- test/service/test_repository_files.py | 2 +- 20 files changed, 273 insertions(+), 197 deletions(-) diff --git a/UPGRADING.rst b/UPGRADING.rst index 95bb334d11..dc4e5bdb07 100644 --- a/UPGRADING.rst +++ b/UPGRADING.rst @@ -48,6 +48,11 @@ In your personal deployments' ``environment.py`` files: 2. Remove the ``prefix`` parameter and its uses from ``bqsrc`` and ``mksrc``. +2. Update the function body of ``mksrc``. + +3. Update the assignment of the ``sources`` parameter in the ``AZUL_CATALOGS`` + variable. + As always, use the sandbox deployment's ``environment.py`` as a model when upgrading personal deployments. diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index 2ac4f7c9c8..aa8f80b145 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -12,12 +12,14 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def bqsrc(google_project: str, snapshot: str, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: assert len(google_project) == 8, google_project project = 'datarepo-dev-' + google_project assert not snapshot.startswith('ANVIL_'), snapshot @@ -28,36 +30,40 @@ def bqsrc(google_project: str, def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: project = '_'.join(snapshot.split('_')[1:-3]) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index 75773550ee..a923c09c42 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -10,12 +10,14 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def bqsrc(google_project: str, snapshot: str, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: assert len(google_project) == 8, google_project project = 'datarepo-dev-' + google_project assert not snapshot.startswith('ANVIL_'), snapshot @@ -26,36 +28,40 @@ def bqsrc(google_project: str, def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: project = '_'.join(snapshot.split('_')[1:-3]) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py index 4f72feb7ee..a9ca936e5a 100644 --- a/deployments/anvilprod/environment.py +++ b/deployments/anvilprod/environment.py @@ -12,12 +12,14 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def bqsrc(google_project: str, snapshot: str, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: assert len(google_project) == 8, google_project project = 'datarepo-' + google_project # Some snapshots start with AnVIL instead of ANVIL @@ -29,36 +31,40 @@ def bqsrc(google_project: str, def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: project = '_'.join(snapshot.split('_')[1:-3]) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index e58012b3fa..17d0a46697 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -10,41 +10,47 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: _, env, project, _ = snapshot.split('_', 3) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index d6b142f682..9d313e3b4e 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -14,12 +14,14 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def bqsrc(google_project: str, snapshot: str, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: assert len(google_project) == 8, google_project project = 'datarepo-' + google_project # Some snapshots start with AnVIL instead of ANVIL @@ -31,36 +33,40 @@ def bqsrc(google_project: str, def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: project = '_'.join(snapshot.split('_')[1:-3]) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index cfb8db4d5f..7e6e6a0a61 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -12,41 +12,47 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: _, env, project, _ = snapshot.split('_', 3) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index c5f2849d0e..6811c610a0 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -12,41 +12,47 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: _, env, project, _ = snapshot.split('_', 3) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/deployments/tempdev/environment.py b/deployments/tempdev/environment.py index 25c09085ea..7f6caf207e 100644 --- a/deployments/tempdev/environment.py +++ b/deployments/tempdev/environment.py @@ -10,41 +10,47 @@ type ProjectName = str type SourceSpec = str +type SourceConfig = dict[str, str | int | float | bool | None] +type SourceItem = tuple[SourceSpec, SourceConfig] def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, - flags: int = 0, - ) -> tuple[ProjectName, SourceSpec | None]: + flags: int = 0 + ) -> tuple[ProjectName, SourceItem | None]: _, env, project, _ = snapshot.split('_', 3) assert flags <= pop - source = None if flags & pop else ':'.join([ - 'tdr', - source_type, - 'gcp', - google_project, - snapshot, - ]) + source = None if flags & pop else ( + ':'.join([ + 'tdr', + source_type, + 'gcp', + google_project, + snapshot, + ]), + {} + ) return project, source -def mkdelta(items: list[tuple[ProjectName, SourceSpec | None]] - ) -> dict[ProjectName, SourceSpec | None]: +def mkdelta(items: list[tuple[ProjectName, SourceItem | None]] + ) -> dict[ProjectName, SourceItem | None]: result = dict(items) assert len(items) == len(result), 'collisions detected' assert list(result.keys()) == sorted(result.keys()), 'input not sorted' return result -def mksrcs(catalog: dict[ProjectName, SourceSpec | None]) -> list[SourceSpec]: - return list(filter(None, catalog.values())) +def mksrcs(catalog: dict[ProjectName, SourceItem | None] + ) -> dict[SourceSpec, SourceConfig]: + return dict(filter(None, catalog.values())) -def mkdict(previous_catalog: dict[ProjectName, SourceSpec | None], +def mkdict(previous_catalog: dict[ProjectName, SourceItem | None], num_expected: int, - delta: dict[ProjectName, SourceSpec | None], - ) -> dict[ProjectName, SourceSpec | None]: + delta: dict[ProjectName, SourceItem | None], + ) -> dict[ProjectName, SourceItem | None]: catalog = previous_catalog | delta num_actual = len(mksrcs(catalog)) assert num_expected == num_actual, (num_expected, num_actual) diff --git a/environment.py b/environment.py index ecc236ca82..6a2ea9fdaa 100644 --- a/environment.py +++ b/environment.py @@ -50,10 +50,10 @@ def env() -> Mapping[str, str | None]: # plugin_type: {'name'=plugin_package}, # ... # } - # 'sources': [ - # source, - # ... - # ] + # 'sources': { + # source: {} + # ... + # } # }, # ... # } diff --git a/scripts/mirror.py b/scripts/mirror.py index 9dc91f7efd..d8b90816e9 100644 --- a/scripts/mirror.py +++ b/scripts/mirror.py @@ -5,9 +5,6 @@ import argparse import logging import sys -from typing import ( - Iterable, -) from azul import ( CatalogName, @@ -20,9 +17,6 @@ from azul.azulclient import ( AzulClient, ) -from azul.indexer import ( - SourceRef, -) from azul.logging import ( configure_script_logging, ) @@ -44,24 +38,28 @@ def mirror_catalog(azul: AzulClient, source.spec: source for source in plugin.list_sources(authentication=None) } - source_refs: Iterable[SourceRef] + source_specs = azul.matching_sources([catalog], source_globs)[catalog] # When the user doesn't specify a source or provides "*" as a source glob, # we implicitly filter out managed-access sources. This lets us assert that # all sources matching the provided globs are public, without forcing the # user to manually specify every public source. if '*' in source_globs: - source_refs = public_sources_by_spec.values() - else: - source_specs = azul.matching_sources([catalog], source_globs)[catalog] - try: - source_refs = { - public_sources_by_spec[spec] - for spec in source_specs - } - except KeyError as e: - assert False, R( - 'Cannot mirror managed-access source', e.args[0]) - azul.remote_mirror(catalog, source_refs) + source_specs = { + spec: cfg + for spec, cfg in source_specs.items() + if spec in public_sources_by_spec + } + + try: + source_refs = { + public_sources_by_spec[spec]: cfg + for spec, cfg in source_specs.items() + } + except KeyError as e: + assert False, R( + 'Cannot mirror managed-access source', e.args[0]) + + azul.remote_mirror(catalog, source_refs.items()) if wait: azul.wait_for_mirroring() assert azul.is_queue_empty(fail_queue), R( diff --git a/scripts/post_deploy_tdr.py b/scripts/post_deploy_tdr.py index dc8dd14d0f..264efcc804 100644 --- a/scripts/post_deploy_tdr.py +++ b/scripts/post_deploy_tdr.py @@ -76,7 +76,7 @@ def verify_sources(self) -> None: all_sources: set[TDRSourceSpec] = set() with ThreadPoolExecutor(max_workers=8) as tpe: for catalog in self.catalogs: - catalog_sources = self.repository_plugin(catalog).sources + catalog_sources = self.repository_plugin(catalog).sources.keys() for source in catalog_sources - all_sources: futures.append(tpe.submit(self.verify_source, catalog, source)) all_sources |= catalog_sources diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 5cd29d1306..78be11ac77 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -65,7 +65,6 @@ json_bool, json_int, json_mapping, - json_sequence, json_str, optional, ) @@ -380,7 +379,7 @@ def dss_endpoint(self) -> str | None: else: return None - def sources(self, catalog: CatalogName) -> Set[str]: + def sources(self, catalog: CatalogName) -> Mapping[str, JSON]: return self.catalogs[catalog].sources @property @@ -875,7 +874,7 @@ def replica_conflict_limit(self) -> int: class Catalog: """ >>> plugins = dict(metadata=dict(name='hca'), repository=dict(name='tdr_hca')) - >>> kwargs = dict(atlas='hca', plugins=plugins, sources=[]) + >>> kwargs = dict(atlas='hca', plugins=plugins, sources={}) >>> c = Config.Catalog.from_json >>> c(name='dcp', spec=dict(internal=False, **kwargs)) @@ -886,7 +885,7 @@ class Catalog: mirror_max_file_size=None, plugins={'metadata': Config.Catalog.Plugin(name='hca'), 'repository': Config.Catalog.Plugin(name='tdr_hca')}, - sources=set()) + sources={}) >>> c(name='dcp-it', spec=dict(internal=True, **kwargs)).is_integration_test_catalog True @@ -921,7 +920,7 @@ def from_json(cls, spec: JSON) -> Self: internal: bool mirror_max_file_size: int | None plugins: Mapping[str, Plugin] - sources: Set[str] + sources: Mapping[str, JSON] _it_catalog_suffix: ClassVar[str] = '-it' @@ -983,12 +982,16 @@ def from_json(cls, name: str, spec: JSON) -> Self: plugin_type: cls.Plugin.from_json(json_mapping(plugin_spec)) for plugin_type, plugin_spec in json_mapping(spec['plugins']).items() } + sources = { + source_spec: json_mapping(source_config) + for source_spec, source_config in json_mapping(spec['sources']).items() + } return cls(name=name, atlas=json_str(spec['atlas']), internal=json_bool(spec['internal']), mirror_max_file_size=optional(json_int, spec.get('mirror_max_file_size')), plugins=plugins, - sources=set(map(json_str, json_sequence(spec['sources'])))) + sources=sources) @classmethod def validate_name(cls, catalog): diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index 89edaf02f6..a181251b4c 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -52,6 +52,7 @@ HasCachedHttpClient, ) from azul.indexer import ( + SourceConfig, SourceRef, SourceSpec, ) @@ -232,12 +233,12 @@ def handle_future(future: Future) -> None: def matching_sources(self, catalogs: Iterable[CatalogName], globs: AbstractSet[str] = frozenset('*') - ) -> dict[CatalogName, set[SourceSpec]]: + ) -> dict[CatalogName, dict[SourceSpec, SourceConfig]]: result = {} matched_globs = set() for catalog in catalogs: raw_specs = config.sources(catalog) - specs = set(self.repository_plugin(catalog).sources) + specs = dict(self.repository_plugin(catalog).sources) if '*' not in globs: matching_raw_specs: set[str] = set() for glob in globs: @@ -247,7 +248,7 @@ def matching_sources(self, matched_globs.add(glob) log.debug('Source glob %r matched sources %r in catalog %r', glob, _matching_raw_specs, catalog) - specs = {spec for spec in specs if str(spec) in matching_raw_specs} + specs = {spec: cfg for spec, cfg in specs.items() if str(spec) in matching_raw_specs} result[catalog] = specs unmatched_globs = globs - matched_globs if unmatched_globs: @@ -388,20 +389,24 @@ def is_queue_empty(self, queue_name: str) -> bool: length, _ = self.queues.get_queue_lengths(queues) return length == 0 - def remote_mirror(self, catalog: CatalogName, sources: Iterable[SourceRef]): + def remote_mirror(self, + catalog: CatalogName, + sources: Iterable[tuple[SourceRef, SourceConfig]] + ): + max_file_size = config.catalogs[catalog].mirror_max_file_size if max_file_size is not None and max_file_size < 0: log.info('Not mirroring any files in catalog %r because the file ' 'size limit is negative', catalog) else: - def message(source: SourceRef): - log.info('Mirroring files in source %r from catalog %r', - str(source.spec), catalog) - return self.mirror_source_message(catalog, source) + def messages(): + for source, cfg in sources: + log.info('Mirroring files in source %r from catalog %r', + str(source.spec), catalog) + yield self.mirror_source_message(catalog, source) - messages = map(message, sources) - self.queue_mirror_messages(messages) + self.queue_mirror_messages(messages()) def _get_non_empty_fail_queues(self) -> set[str]: return { diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py index 0982d01023..219523bfe8 100644 --- a/src/azul/indexer/__init__.py +++ b/src/azul/indexer/__init__.py @@ -407,6 +407,14 @@ def __contains__(self, partition_prefix: str) -> bool: Prefix.of_everything = Prefix.parse('/0') +@attrs.frozen(kw_only=True) +class SourceConfig(SerializableAttrs): + """ + Configuration + """ + pass + + @attrs.frozen(kw_only=True, order=True) class SourceSpec(Parseable, metaclass=ABCMeta): """ diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index e1dc44adad..9a645f7d26 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -10,7 +10,6 @@ isabstract, ) from typing import ( - AbstractSet, Callable, ClassVar, Iterable, @@ -48,6 +47,7 @@ from azul.indexer import ( Bundle, Prefix, + SourceConfig, SourceRef, SourceSpec, SourcedBundleFQID, @@ -572,11 +572,14 @@ def create(cls, catalog: CatalogName) -> Self: return cls(catalog=catalog) @cached_property - def sources(self) -> AbstractSet[SOURCE_SPEC]: + def sources(self) -> Mapping[SOURCE_SPEC, SourceConfig]: """ The sources the plugin is configured to read metadata from. """ - return frozenset(map(self.parse_source, config.sources(self.catalog))) + return { + self.parse_source(source_spec): SourceConfig.from_json(source_config) + for source_spec, source_config in config.sources(self.catalog).items() + } def _assert_source(self, source: SOURCE_REF): """ diff --git a/test/azul_test_case.py b/test/azul_test_case.py index a907f20afa..f566feb19b 100644 --- a/test/azul_test_case.py +++ b/test/azul_test_case.py @@ -456,7 +456,7 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='hca'), repository=config.Catalog.Plugin(name='dss')), - sources={str(cls.source.spec)}) + sources={str(cls.source.spec): {}}) } @@ -480,7 +480,7 @@ def _patch_tdr_service_url(cls): @classmethod def _sources(cls): - return {str(cls.source.spec)} + return {str(cls.source.spec): {}} @classmethod def _patch_source_cache(cls): @@ -522,5 +522,5 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='anvil'), repository=config.Catalog.Plugin(name='tdr_anvil')), - sources={str(cls.source.spec)}) + sources={str(cls.source.spec): {}}) } diff --git a/test/indexer/test_mirror_controller.py b/test/indexer/test_mirror_controller.py index 971a75cc6e..e63ff4c66c 100644 --- a/test/indexer/test_mirror_controller.py +++ b/test/indexer/test_mirror_controller.py @@ -24,6 +24,9 @@ from azul.http import ( http_client, ) +from azul.indexer import ( + SourceConfig, +) from azul.indexer.mirror_controller import ( MirrorController, ) @@ -128,7 +131,7 @@ def _mirror_event(self, body: JSON) -> list[SQSRecord]: return [self._mock_sqs_record(body, fifo=True)] def _remote_mirror(self) -> MutableJSONs: - self.client.remote_mirror(self.catalog, [self.source]) + self.client.remote_mirror(self.catalog, [(self.source, SourceConfig())]) return self._read_queue(self.client.mirror_queue()) def _test_remote_mirror(self): diff --git a/test/integration_test.py b/test/integration_test.py index eb02d5e69c..85cb6e746a 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -118,6 +118,7 @@ ) from azul.indexer import ( Prefix, + SourceConfig, SourceRef, SourceSpec, SourcedBundleFQID, @@ -285,8 +286,8 @@ def managed_access_sources_by_catalog(self if config.is_tdr_enabled(catalog) } managed_access_sources = {catalog: set() for catalog in config.catalogs} - for catalog, specs in configured_sources.items(): - for spec in specs: + for catalog, sources in configured_sources.items(): + for spec, _ in sources.items(): source_id = one(id for id, name in all_sources.items() if name == spec.name) if source_id not in public_sources: ref = TDRSourceRef(id=source_id, spec=spec, prefix=None) @@ -297,7 +298,7 @@ def _select_source(self, catalog: CatalogName, *, public: bool | None = None - ) -> SourceRef | None: + ) -> tuple[SourceRef, SourceConfig] | None: """ Choose an indexed source at random. @@ -323,27 +324,27 @@ def _select_source(self, # it's actually needed for source in self.managed_access_sources_by_catalog[catalog] } - self.assertIsSubset(ma_sources, sources) + self.assertIsSubset(ma_sources, sources.keys()) - def _filter(source: SourceSpec) -> bool: + def _filter(source: tuple[SourceSpec, SourceConfig]) -> bool: if public is None: valid = True elif public is True: - valid = source not in ma_sources + valid = source[0] not in ma_sources elif public is False: - valid = source in ma_sources + valid = source[0] in ma_sources else: assert False, public return valid - sources = set(filter(_filter, sources)) + sources = dict(filter(_filter, sources.items())) if len(sources) == 0: assert public is False, 'An IT catalog must contain at least one public source' return None else: - source = self.random.choice(sorted(sources)) - return plugin.resolve_source(source) + source, cfg = self.random.choice(sorted(sources.items())) + return plugin.resolve_source(source), cfg class IndexingIntegrationTest(IntegrationTestCase): @@ -455,8 +456,10 @@ class Catalog: catalogs: list[Catalog] = [] for catalog in config.integration_test_catalogs: if index: - public_source = self._select_source(catalog, public=True) + public_source, _ = self._select_source(catalog, public=True) ma_source = self._select_source(catalog, public=False) + if ma_source is not None: + ma_source = ma_source[0] sources = alist(public_source, ma_source) notifications, fqids = self._prepare_notifications(catalog, sources) else: @@ -1894,7 +1897,7 @@ def test_can_bundle_canned_repository(self): 'repository': config.Catalog.Plugin(name='canned'), }, sources={ - 'https://github.com/HumanCellAtlas/schema-test-data/tree/master/tests' + 'https://github.com/HumanCellAtlas/schema-test-data/tree/master/tests': {} }) with mock.patch.object(Config, 'catalogs', @@ -1904,7 +1907,7 @@ def test_can_bundle_canned_repository(self): self._test_catalog(mock_catalog) def bundle_fqid(self, catalog: CatalogName) -> SourcedBundleFQID: - source = self._select_source(catalog) + source, _ = self._select_source(catalog) # The plugin will raise an exception if the source lacks a prefix source = source.with_prefix(Prefix.of_everything) bundle_fqids = self.azul_client.index_repository_service.list_bundles(catalog, source, prefix='') diff --git a/test/service/test_repository_files.py b/test/service/test_repository_files.py index 513e481c5f..c1b106a54f 100644 --- a/test/service/test_repository_files.py +++ b/test/service/test_repository_files.py @@ -195,7 +195,7 @@ def test_repository_files(self, mock_get_cached_sources): @classmethod def _sources(cls): - return set(map(cls.make_mock_source_spec, cls.mock_source_names)) + return {cls.make_mock_source_spec(n): {} for n in cls.mock_source_names} @mock.patch.object(TDRClient, 'snapshot_names_by_id') @mock.patch.object(TDRClient, 'validate', new=MagicMock()) From 28cb74ebf6b06b2ca383e48a24c01693add0be54 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 16 Oct 2025 20:54:54 -0700 Subject: [PATCH 11/14] [u 6/6] Configure mirroring per source (#7066) --- deployments/anvilbox/environment.py | 6 +++++- deployments/anvildev/environment.py | 6 +++++- deployments/anvilprod/environment.py | 6 +++++- deployments/dev/environment.py | 6 +++++- deployments/hammerbox/environment.py | 6 +++++- deployments/prod/environment.py | 6 +++++- deployments/sandbox/environment.py | 6 +++++- deployments/tempdev/environment.py | 6 +++++- src/azul/azulclient.py | 11 ++++++++--- src/azul/indexer/__init__.py | 2 +- test/azul_test_case.py | 6 +++--- test/indexer/test_mirror_controller.py | 10 ++++++++-- test/integration_test.py | 16 +++++++++++++--- test/service/test_repository_files.py | 2 +- 14 files changed, 74 insertions(+), 21 deletions(-) diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index aa8f80b145..8b3bce38a2 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -9,6 +9,7 @@ is_sandbox = True pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -42,8 +43,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index a923c09c42..644cd6ae25 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -7,6 +7,7 @@ ) pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -40,8 +41,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/anvilprod/environment.py b/deployments/anvilprod/environment.py index a9ca936e5a..e2faf15b58 100644 --- a/deployments/anvilprod/environment.py +++ b/deployments/anvilprod/environment.py @@ -9,6 +9,7 @@ ) pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -43,8 +44,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/dev/environment.py b/deployments/dev/environment.py index 17d0a46697..d92056bf09 100644 --- a/deployments/dev/environment.py +++ b/deployments/dev/environment.py @@ -7,6 +7,7 @@ ) pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -29,8 +30,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/hammerbox/environment.py b/deployments/hammerbox/environment.py index 9d313e3b4e..ba25a1337d 100644 --- a/deployments/hammerbox/environment.py +++ b/deployments/hammerbox/environment.py @@ -11,6 +11,7 @@ is_sandbox = True pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -45,8 +46,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index 7e6e6a0a61..048f87d3b3 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -9,6 +9,7 @@ ) pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -31,8 +32,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/sandbox/environment.py b/deployments/sandbox/environment.py index 6811c610a0..48f7d106d3 100644 --- a/deployments/sandbox/environment.py +++ b/deployments/sandbox/environment.py @@ -9,6 +9,7 @@ is_sandbox = True pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -31,8 +32,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/deployments/tempdev/environment.py b/deployments/tempdev/environment.py index 7f6caf207e..9f15117ba7 100644 --- a/deployments/tempdev/environment.py +++ b/deployments/tempdev/environment.py @@ -7,6 +7,7 @@ ) pop = 1 # remove snapshot +no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots) type ProjectName = str type SourceSpec = str @@ -29,8 +30,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'], google_project, snapshot, ]), - {} + { + 'mirror': not (flags & no_mirror), + } ) + return project, source diff --git a/src/azul/azulclient.py b/src/azul/azulclient.py index a181251b4c..0a26de0250 100644 --- a/src/azul/azulclient.py +++ b/src/azul/azulclient.py @@ -402,9 +402,14 @@ def remote_mirror(self, def messages(): for source, cfg in sources: - log.info('Mirroring files in source %r from catalog %r', - str(source.spec), catalog) - yield self.mirror_source_message(catalog, source) + if cfg.mirror: + log.info('Mirroring files in source %r from catalog %r', + str(source.spec), catalog) + yield self.mirror_source_message(catalog, source) + else: + log.info('Not mirroring any files in source %r from catalog %r because ' + 'mirroring is explicitly disabled', + str(source.spec), catalog) self.queue_mirror_messages(messages()) diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py index 219523bfe8..a70a065468 100644 --- a/src/azul/indexer/__init__.py +++ b/src/azul/indexer/__init__.py @@ -412,7 +412,7 @@ class SourceConfig(SerializableAttrs): """ Configuration """ - pass + mirror: bool @attrs.frozen(kw_only=True, order=True) diff --git a/test/azul_test_case.py b/test/azul_test_case.py index f566feb19b..1020b0af00 100644 --- a/test/azul_test_case.py +++ b/test/azul_test_case.py @@ -456,7 +456,7 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='hca'), repository=config.Catalog.Plugin(name='dss')), - sources={str(cls.source.spec): {}}) + sources={str(cls.source.spec): {'mirror': True}}) } @@ -480,7 +480,7 @@ def _patch_tdr_service_url(cls): @classmethod def _sources(cls): - return {str(cls.source.spec): {}} + return {str(cls.source.spec): {'mirror': True}} @classmethod def _patch_source_cache(cls): @@ -522,5 +522,5 @@ def catalog_config(cls) -> dict[CatalogName, Config.Catalog]: mirror_max_file_size=None, plugins=dict(metadata=config.Catalog.Plugin(name='anvil'), repository=config.Catalog.Plugin(name='tdr_anvil')), - sources={str(cls.source.spec): {}}) + sources={str(cls.source.spec): {'mirror': True}}) } diff --git a/test/indexer/test_mirror_controller.py b/test/indexer/test_mirror_controller.py index e63ff4c66c..9541437eb9 100644 --- a/test/indexer/test_mirror_controller.py +++ b/test/indexer/test_mirror_controller.py @@ -130,8 +130,9 @@ def mirror_controller(self) -> MirrorController: def _mirror_event(self, body: JSON) -> list[SQSRecord]: return [self._mock_sqs_record(body, fifo=True)] - def _remote_mirror(self) -> MutableJSONs: - self.client.remote_mirror(self.catalog, [(self.source, SourceConfig())]) + def _remote_mirror(self, mirror_source_cfg: bool = True) -> MutableJSONs: + cfg = SourceConfig(mirror=mirror_source_cfg) + self.client.remote_mirror(self.catalog, [(self.source, cfg)]) return self._read_queue(self.client.mirror_queue()) def _test_remote_mirror(self): @@ -208,6 +209,11 @@ def test_info_schema(self): def test_files_not_mirrored(self): self._create_mock_queues(config.mirror_queue_names) + + with self.subTest(no_mirror=True): + messages = self._remote_mirror(mirror_source_cfg=False) + self.assertEqual([], messages) + catalog = config.catalogs[self.catalog] def patch_max_file_size(size): diff --git a/test/integration_test.py b/test/integration_test.py index 85cb6e746a..a63ed64fb7 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -297,7 +297,8 @@ def managed_access_sources_by_catalog(self def _select_source(self, catalog: CatalogName, *, - public: bool | None = None + public: bool | None = None, + mirror: bool = False, ) -> tuple[SourceRef, SourceConfig] | None: """ Choose an indexed source at random. @@ -310,6 +311,11 @@ def _select_source(self, public sources. If false, choose a non-public source, or return `None` if the catalog contains no non-public sources. + + :param mirror: If true, choose a source where the `no_mirror` flag is + not present, or return `None` if the catalog contains no + such source. If false, choose a source regardless of + whether this flag is present. """ plugin = self.repository_plugin(catalog) sources = plugin.sources @@ -335,6 +341,8 @@ def _filter(source: tuple[SourceSpec, SourceConfig]) -> bool: valid = source[0] in ma_sources else: assert False, public + if mirror: + valid &= source[1].mirror return valid sources = dict(filter(_filter, sources.items())) @@ -1722,7 +1730,7 @@ def _test_mirroring(self, *, delete: bool): if c.is_integration_test_catalog and c.mirror_max_file_size >= 0 ] sources_by_catalog = { - catalog: [self._select_source(catalog, public=True)] + catalog: [self._select_source(catalog, public=True, mirror=True)] for catalog in catalogs } @@ -1897,7 +1905,9 @@ def test_can_bundle_canned_repository(self): 'repository': config.Catalog.Plugin(name='canned'), }, sources={ - 'https://github.com/HumanCellAtlas/schema-test-data/tree/master/tests': {} + 'https://github.com/HumanCellAtlas/schema-test-data/tree/master/tests': { + 'mirror': False + }, }) with mock.patch.object(Config, 'catalogs', diff --git a/test/service/test_repository_files.py b/test/service/test_repository_files.py index c1b106a54f..4bf89b6086 100644 --- a/test/service/test_repository_files.py +++ b/test/service/test_repository_files.py @@ -195,7 +195,7 @@ def test_repository_files(self, mock_get_cached_sources): @classmethod def _sources(cls): - return {cls.make_mock_source_spec(n): {} for n in cls.mock_source_names} + return {cls.make_mock_source_spec(n): {'mirror': True} for n in cls.mock_source_names} @mock.patch.object(TDRClient, 'snapshot_names_by_id') @mock.patch.object(TDRClient, 'validate', new=MagicMock()) From 9c9b20d4830d23a7f6084f95b0fd9e9c3ab655a3 Mon Sep 17 00:00:00 2001 From: Noa Dove Date: Thu, 30 Oct 2025 18:17:35 -0700 Subject: [PATCH 12/14] Verify download size in mirror service --- src/azul/indexer/mirror_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/azul/indexer/mirror_service.py b/src/azul/indexer/mirror_service.py index d48f049310..1514bda1f7 100644 --- a/src/azul/indexer/mirror_service.py +++ b/src/azul/indexer/mirror_service.py @@ -365,8 +365,10 @@ def _download(self, file: File, part: FilePart | None = None) -> bytes: str(download_url), headers=headers) if response.status == expected_status: + actual_size = len(response.data) log.info('Downloaded %d bytes in %.3fs from file %r', - size, time.time() - start, file) + actual_size, time.time() - start, file) + assert actual_size == size, R(f'Expected {size} bytes, got {actual_size}') return response.data else: raise RuntimeError('Unexpected response from repository', response.status) From 40e88db34e2d2b9c53c81052433a6d2f02c58fee Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Mon, 16 Jun 2025 13:31:24 -0700 Subject: [PATCH 13/14] [r] Fix: AnVIL uses uncommon encoding for MD5 digests (#7154) --- .../plugins/repository/tdr_anvil/__init__.py | 18 +++++- ...604d-ab34-af39-f5b9f5d61818.tdr.anvil.json | 6 +- ...2-e274-affe-aabc-eb3db63ad068.results.json | 56 +++++++++---------- ...e274-affe-aabc-eb3db63ad068.tdr.anvil.json | 4 +- .../manifest/verbatim/jsonl/anvil/linked.json | 6 +- .../verbatim/pfb/anvil/pfb_entities.json | 6 +- test/service/test_manifest.py | 6 +- test/service/test_response_anvil.py | 4 +- 8 files changed, 61 insertions(+), 45 deletions(-) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 3c839580e0..b0c6ec1206 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -351,7 +351,15 @@ def list_files(self, source: TDRSourceRef, prefix: str) -> list[AnvilFile]: batch = self._get_batch(source.spec, 'anvil_file', prefix, - key_column='file_md5sum') + key_column=self._column_from_64_to_hex('file_md5sum')) + + def missing_md5(row: BigQueryRow) -> bool: + missing = row['file_md5sum'] is None + if missing: + assert source.spec.name == 'ANVIL_1000G_2019_Dev_20230609_ANV5_202306121732', R( + 'File lacks MD5 digest', source, dict(row)) + return missing + return [ AnvilFile(uuid=ref.entity_id, name=row['file_name'], @@ -360,6 +368,7 @@ def list_files(self, source: TDRSourceRef, prefix: str) -> list[AnvilFile]: md5=row['file_md5sum'], drs_uri=row['file_ref']) for ref, row in batch + if not missing_md5(row) ] def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: @@ -953,4 +962,11 @@ def _columns(self, table_name: str) -> set[str]: else: columns = set(columns) columns.add('datarepo_row_id') + if table_name == 'anvil_file': + column = 'file_md5sum' + columns.remove(column) + columns.add(f'{self._column_from_64_to_hex(column)} AS {column}') return columns + + def _column_from_64_to_hex(self, column: str) -> str: + return f'TO_HEX(FROM_BASE64({column}))' diff --git a/test/indexer/data/595c469e-604d-ab34-af39-f5b9f5d61818.tdr.anvil.json b/test/indexer/data/595c469e-604d-ab34-af39-f5b9f5d61818.tdr.anvil.json index dd3946fbcc..94fc5ac1fe 100644 --- a/test/indexer/data/595c469e-604d-ab34-af39-f5b9f5d61818.tdr.anvil.json +++ b/test/indexer/data/595c469e-604d-ab34-af39-f5b9f5d61818.tdr.anvil.json @@ -27,7 +27,7 @@ "data_modality": [], "datarepo_row_id": "6b0f6c0f-5d80-4242-accb-840921351cd5", "file_format": ".txt", - "file_md5sum": "S/GBrRjzZAQYqh3rdiPYzA==", + "file_md5sum": "4bf181ad18f3640418aa1deb7623d8cc", "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_name": "CCDG_13607_B01_GRM_WGS_2019-02-19_chr15.recalibrated_variants.annotated.coding.txt", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715", @@ -60,7 +60,7 @@ "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "file_format": ".vcf.gz", "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_size": 213021639, @@ -79,7 +79,7 @@ "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", "file_format": ".bam", "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_size": 3306845592, diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json index 90af57452d..2fbd7a0bbc 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json @@ -183,7 +183,7 @@ "file_size": 213021639, "file_size_": 213021639, "file_md5sum": [ - "vuxgbuCqKZ/fkT9CWTFmIg==" + "beec606ee0aa299fdf913f4259316622" ], "reference_assembly": [ "~null" @@ -390,7 +390,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -637,7 +637,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -828,7 +828,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -856,7 +856,7 @@ }, { "_index": "azul_v2_nadove4_test_replica", - "_id": "anvil_file_15b76f9c-6b46-433f-851d-34e89f1b9ba6_aec7ac65991cc6ea974a7a86fc30ec96c081cfcf", + "_id": "anvil_file_15b76f9c-6b46-433f-851d-34e89f1b9ba6_5d1a78bcffdd1ab9966053541cf7aba09f6d8353", "_score": 1.0, "_source": { "entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", @@ -876,7 +876,7 @@ "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "file_format": ".vcf.gz", "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_size": 213021639, @@ -1123,7 +1123,7 @@ "file_size": 213021639, "file_size_": 213021639, "file_md5sum": [ - "vuxgbuCqKZ/fkT9CWTFmIg==" + "beec606ee0aa299fdf913f4259316622" ], "reference_assembly": [ "~null" @@ -1170,7 +1170,7 @@ "file_size": 3306845592, "file_size_": 3306845592, "file_md5sum": [ - "fNn9e1SovzgOROk3BvH6LQ==" + "7cd9fd7b54a8bf380e44e93706f1fa2d" ], "reference_assembly": [ "~null" @@ -1395,7 +1395,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -1419,7 +1419,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -1671,7 +1671,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -1704,7 +1704,7 @@ }, { "_index": "azul_v2_nadove4_test_replica", - "_id": "anvil_file_3b17377b-16b1-431c-9967-e5d01fc5923f_669b7664f4b6865a0604285f923995acaf84b4ab", + "_id": "anvil_file_3b17377b-16b1-431c-9967-e5d01fc5923f_f5083a9b6e1e772f4f6fd89e329070fa23885d18", "_score": 1.0, "_source": { "entity_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", @@ -1713,7 +1713,7 @@ "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", "file_format": ".bam", "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_size": 3306845592, @@ -1900,7 +1900,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -2110,7 +2110,7 @@ "file_size": 3306845592, "file_size_": 3306845592, "file_md5sum": [ - "fNn9e1SovzgOROk3BvH6LQ==" + "7cd9fd7b54a8bf380e44e93706f1fa2d" ], "reference_assembly": [ "~null" @@ -2353,7 +2353,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -2560,7 +2560,7 @@ "file_size": 213021639, "file_size_": 213021639, "file_md5sum": [ - "vuxgbuCqKZ/fkT9CWTFmIg==" + "beec606ee0aa299fdf913f4259316622" ], "reference_assembly": [ "~null" @@ -2607,7 +2607,7 @@ "file_size": 3306845592, "file_size_": 3306845592, "file_md5sum": [ - "fNn9e1SovzgOROk3BvH6LQ==" + "7cd9fd7b54a8bf380e44e93706f1fa2d" ], "reference_assembly": [ "~null" @@ -2832,7 +2832,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -2856,7 +2856,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -3121,7 +3121,7 @@ "file_size": 213021639, "file_size_": 213021639, "file_md5sum": [ - "vuxgbuCqKZ/fkT9CWTFmIg==" + "beec606ee0aa299fdf913f4259316622" ], "reference_assembly": [ "~null" @@ -3168,7 +3168,7 @@ "file_size": 3306845592, "file_size_": 3306845592, "file_md5sum": [ - "fNn9e1SovzgOROk3BvH6LQ==" + "7cd9fd7b54a8bf380e44e93706f1fa2d" ], "reference_assembly": [ "~null" @@ -3393,7 +3393,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -3417,7 +3417,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], @@ -3672,7 +3672,7 @@ "file_size": 213021639, "file_size_": 213021639, "file_md5sum": [ - "vuxgbuCqKZ/fkT9CWTFmIg==" + "beec606ee0aa299fdf913f4259316622" ], "reference_assembly": [ "~null" @@ -3719,7 +3719,7 @@ "file_size": 3306845592, "file_size_": 3306845592, "file_md5sum": [ - "fNn9e1SovzgOROk3BvH6LQ==" + "7cd9fd7b54a8bf380e44e93706f1fa2d" ], "reference_assembly": [ "~null" @@ -3944,7 +3944,7 @@ "file_format": ".vcf.gz", "file_size": 213021639, "file_size_": 213021639, - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "reference_assembly": [ "~null" ], @@ -3968,7 +3968,7 @@ "file_format": ".bam", "file_size": 3306845592, "file_size_": 3306845592, - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "reference_assembly": [ "~null" ], diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json index 06c595df47..e12c336b57 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.tdr.anvil.json @@ -111,7 +111,7 @@ "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "file_format": ".vcf.gz", "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_size": 213021639, @@ -130,7 +130,7 @@ "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", "file_format": ".bam", "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_size": 3306845592, diff --git a/test/service/data/manifest/verbatim/jsonl/anvil/linked.json b/test/service/data/manifest/verbatim/jsonl/anvil/linked.json index b8fbe5fff3..a9129f0c27 100644 --- a/test/service/data/manifest/verbatim/jsonl/anvil/linked.json +++ b/test/service/data/manifest/verbatim/jsonl/anvil/linked.json @@ -127,7 +127,7 @@ "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "file_format": ".vcf.gz", "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_size": 213021639, @@ -149,7 +149,7 @@ "datarepo_row_id": "3b17377b-16b1-431c-9967-e5d01fc5923f", "file_format": ".bam", "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_size": 3306845592, @@ -210,7 +210,7 @@ "data_modality": [], "datarepo_row_id": "6b0f6c0f-5d80-4242-accb-840921351cd5", "file_format": ".txt", - "file_md5sum": "S/GBrRjzZAQYqh3rdiPYzA==", + "file_md5sum": "4bf181ad18f3640418aa1deb7623d8cc", "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_name": "CCDG_13607_B01_GRM_WGS_2019-02-19_chr15.recalibrated_variants.annotated.coding.txt", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715", diff --git a/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json b/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json index 23b92f5a23..a409cd7633 100644 --- a/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json +++ b/test/service/data/manifest/verbatim/pfb/anvil/pfb_entities.json @@ -247,7 +247,7 @@ "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_format": ".vcf.gz", "file_id": "1e269f04-4347-4188-b060-1dcc69e71d67", - "file_md5sum": "vuxgbuCqKZ/fkT9CWTFmIg==", + "file_md5sum": "beec606ee0aa299fdf913f4259316622", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1e269f04-4347-4188-b060-1dcc69e71d67", "file_size": 213021639, @@ -294,7 +294,7 @@ "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_format": ".txt", "file_id": "1fab11f5-7eab-4318-9a58-68d8d06e0715", - "file_md5sum": "S/GBrRjzZAQYqh3rdiPYzA==", + "file_md5sum": "4bf181ad18f3640418aa1deb7623d8cc", "file_name": "CCDG_13607_B01_GRM_WGS_2019-02-19_chr15.recalibrated_variants.annotated.coding.txt", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_1fab11f5-7eab-4318-9a58-68d8d06e0715", "file_size": 15079345, @@ -419,7 +419,7 @@ "drs_uri": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_format": ".bam", "file_id": "8b722e88-8103-49c1-b351-e64fa7c6ab37", - "file_md5sum": "fNn9e1SovzgOROk3BvH6LQ==", + "file_md5sum": "7cd9fd7b54a8bf380e44e93706f1fa2d", "file_name": "307500.merged.matefixed.sorted.markeddups.recal.bam", "file_ref": "drs://mock_tdr.lan/v1_6c87f0e1-509d-46a4-b845-7584df39263b_8b722e88-8103-49c1-b351-e64fa7c6ab37", "file_size": 3306845592, diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py index d81dd0b2d3..3f243b53a8 100644 --- a/test/service/test_manifest.py +++ b/test/service/test_manifest.py @@ -1691,9 +1691,9 @@ def test_compact_manifest(self): ), ( 'files.file_md5sum', - 'S/GBrRjzZAQYqh3rdiPYzA==', - 'vuxgbuCqKZ/fkT9CWTFmIg==', - 'fNn9e1SovzgOROk3BvH6LQ==' + '4bf181ad18f3640418aa1deb7623d8cc', + 'beec606ee0aa299fdf913f4259316622', + '7cd9fd7b54a8bf380e44e93706f1fa2d' ), ( 'files.reference_assembly', diff --git a/test/service/test_response_anvil.py b/test/service/test_response_anvil.py index 929242674c..e047f84b60 100644 --- a/test/service/test_response_anvil.py +++ b/test/service/test_response_anvil.py @@ -1261,7 +1261,7 @@ def test_entity_indices(self): 'data_modality': [None], 'file_format': '.vcf.gz', 'file_size': 213021639, - 'file_md5sum': 'vuxgbuCqKZ/fkT9CWTFmIg==', + 'file_md5sum': 'beec606ee0aa299fdf913f4259316622', 'reference_assembly': [None], 'file_name': '307500.merged.matefixed.sorted.markeddups.recal.g.vcf.gz', 'is_supplementary': False, @@ -1344,7 +1344,7 @@ def test_entity_indices(self): 'data_modality': [None], 'file_format': '.bam', 'file_size': 3306845592, - 'file_md5sum': 'fNn9e1SovzgOROk3BvH6LQ==', + 'file_md5sum': '7cd9fd7b54a8bf380e44e93706f1fa2d', 'reference_assembly': [None], 'file_name': '307500.merged.matefixed.sorted.markeddups.recal.bam', 'is_supplementary': False, From 4b414340840cf01206fe923ef536103521b2df76 Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Mon, 16 Jun 2025 13:53:19 -0700 Subject: [PATCH 14/14] Enable mirroring in anvildev and anvilbox (#7214) --- deployments/anvilbox/environment.py | 2 ++ deployments/anvildev/environment.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index 8b3bce38a2..b5bb329f69 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -178,4 +178,6 @@ def env() -> Mapping[str, str | None]: 'AZUL_DEPLOYMENT_INCARNATION': '2', 'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '561542988117-cpo2avhomdh6t7fetp91js78cdhm9p47.apps.googleusercontent.com', + + 'AZUL_ENABLE_MIRRORING': '1', } diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index 644cd6ae25..86f54b9378 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -155,6 +155,8 @@ def env() -> Mapping[str, str | None]: 'AZUL_GOOGLE_OAUTH2_CLIENT_ID': '561542988117-3cv4g8ii9enl2000ra6m02r3ne7bgnth.apps.googleusercontent.com', + 'AZUL_ENABLE_MIRRORING': '1', + 'azul_slack_integration': json.dumps({ 'workspace_id': 'T09P9H91S', # ucsc-gi.slack.com 'channel_id': 'C04K4BQET7G' # #team-boardwalk-anvildev