Skip to content

Commit bd78fc9

Browse files
committed
[u r 2/2] Configure mirroring per catalog and source (#7066)
1 parent a325a0d commit bd78fc9

File tree

8 files changed

+61
-15
lines changed

8 files changed

+61
-15
lines changed

deployments/anvildev/environment.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def env() -> Mapping[str, Optional[str]]:
108108
'AZUL_CATALOGS': json.dumps({
109109
f'{catalog}{suffix}': dict(atlas=atlas,
110110
internal=internal,
111+
mirror_max_file_size=1.5 * 1024 ** 3,
111112
plugins=dict(metadata=dict(name='anvil'),
112113
repository=dict(name='tdr_anvil')),
113114
sources=list(filter(None, sources.values())))

deployments/dev/environment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,15 @@ def mkdict(previous_catalog: dict[str, str],
182182
]))
183183

184184

185+
def mirror_max_file_size(atlas: str, internal: bool) -> int | None:
186+
if atlas == 'hca':
187+
return 1.5 * 1024 ** 3
188+
elif atlas == 'lungmap':
189+
return -1
190+
else:
191+
assert False, atlas
192+
193+
185194
def env() -> Mapping[str, Optional[str]]:
186195
"""
187196
Returns a dictionary that maps environment variable names to values. The
@@ -217,6 +226,7 @@ def env() -> Mapping[str, Optional[str]]:
217226
'AZUL_CATALOGS': json.dumps({
218227
f'{catalog}{suffix}': dict(atlas=atlas,
219228
internal=internal,
229+
mirror_max_file_size=mirror_max_file_size(atlas, internal),
220230
plugins=dict(metadata=dict(name='hca'),
221231
repository=dict(name='tdr_hca')),
222232
sources=list(filter(None, sources.values())))

deployments/hammerbox/environment.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,7 @@ def env() -> Mapping[str, Optional[str]]:
10751075
'AZUL_CATALOGS': json.dumps({
10761076
f'{catalog}{suffix}': dict(atlas=atlas,
10771077
internal=internal,
1078+
mirror_max_file_size=1.5 * 1024 ** 3,
10781079
plugins=dict(metadata=dict(name='anvil'),
10791080
repository=dict(name='tdr_anvil')),
10801081
sources=list(filter(None, sources.values())))

deployments/prod/environment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,6 +1763,15 @@ def mkdict(previous_catalog: dict[str, str],
17631763
]))
17641764

17651765

1766+
def mirror_max_file_size(atlas: str, internal: bool) -> int | None:
1767+
if atlas == 'hca':
1768+
return 1.5 * 1024 ** 3 if internal else None
1769+
elif atlas == 'lungmap':
1770+
return -1
1771+
else:
1772+
assert False, atlas
1773+
1774+
17661775
def env() -> Mapping[str, Optional[str]]:
17671776
"""
17681777
Returns a dictionary that maps environment variable names to values. The
@@ -1797,6 +1806,7 @@ def env() -> Mapping[str, Optional[str]]:
17971806
'AZUL_CATALOGS': base64.b64encode(bz2.compress(json.dumps({
17981807
f'{catalog}{suffix}': dict(atlas=atlas,
17991808
internal=internal,
1809+
mirror_max_file_size=mirror_max_file_size(atlas, internal),
18001810
plugins=dict(metadata=dict(name='hca'),
18011811
repository=dict(name='tdr_hca')),
18021812
sources=mklist(sources))

deployments/sandbox/environment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,15 @@ def mkdict(previous_catalog: dict[str, str],
184184
]))
185185

186186

187+
def mirror_max_file_size(atlas: str, internal: bool) -> int | None:
188+
if atlas == 'hca':
189+
return 1.5 * 1024 ** 3
190+
elif atlas == 'lungmap':
191+
return -1
192+
else:
193+
assert False, atlas
194+
195+
187196
def env() -> Mapping[str, Optional[str]]:
188197
"""
189198
Returns a dictionary that maps environment variable names to values. The
@@ -230,6 +239,7 @@ def env() -> Mapping[str, Optional[str]]:
230239
'AZUL_CATALOGS': json.dumps({
231240
f'{catalog}{suffix}': dict(atlas=atlas,
232241
internal=internal,
242+
mirror_max_file_size=mirror_max_file_size(atlas, internal),
233243
plugins=dict(metadata=dict(name='hca'),
234244
repository=dict(name='tdr_hca')),
235245
sources=list(filter(None, sources.values())))

environment.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def env() -> Mapping[str, Optional[str]]:
4747
# 'name': {
4848
# 'atlas': 'bar',
4949
# 'internal': True,
50+
# 'mirror_max_file_size': -1,
5051
# 'plugins': {
5152
# plugin_type: {'name'=plugin_package},
5253
# plugin_type: {'name'=plugin_package},
@@ -68,6 +69,10 @@ def env() -> Mapping[str, Optional[str]]:
6869
# `plugin_package` denotes the concrete implementation of how to fulfill
6970
# that purpose.
7071
#
72+
# Files larger than `mirror_max_file_size` will not be mirrored for
73+
# that catalog. This property may be absent, null, or negative, all of
74+
# which will result in no files being mirrored for that catalog.
75+
#
7176
# The first catalog listed is the default catalog.
7277
#
7378
# A source represents a TDR snapshot or canned staging area to index.

src/azul/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
JSON,
6464
MutableJSON,
6565
json_bool,
66+
json_int,
6667
json_mapping,
6768
json_sequence,
6869
json_str,
@@ -883,6 +884,7 @@ class Catalog:
883884
Config.Catalog(name='dcp',
884885
atlas='hca',
885886
internal=False,
887+
mirror_max_file_size=None,
886888
plugins={'metadata': Config.Catalog.Plugin(name='hca'),
887889
'repository': Config.Catalog.Plugin(name='tdr_hca')},
888890
sources=set())
@@ -918,6 +920,7 @@ def from_json(cls, spec: JSON) -> Self:
918920
name: str
919921
atlas: str
920922
internal: bool
923+
mirror_max_file_size: int | None
921924
plugins: Mapping[str, Plugin]
922925
sources: Set[str]
923926

@@ -981,9 +984,15 @@ def from_json(cls, name: str, spec: JSON) -> Self:
981984
plugin_type: cls.Plugin.from_json(json_mapping(plugin_spec))
982985
for plugin_type, plugin_spec in json_mapping(spec['plugins']).items()
983986
}
987+
mirror_max_file_size = spec.get('mirror_max_file_size')
984988
return cls(name=name,
985989
atlas=json_str(spec['atlas']),
986990
internal=json_bool(spec['internal']),
991+
mirror_max_file_size=(
992+
None
993+
if mirror_max_file_size is None
994+
else json_int(mirror_max_file_size)
995+
),
987996
plugins=plugins,
988997
sources=set(map(json_str, json_sequence(spec['sources']))))
989998

src/azul/indexer/mirror_controller.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,21 @@ def mirror_source(self, catalog: CatalogName, source_json: JSON):
130130
source = plugin.source_ref_cls.from_json(source_json)
131131
assert source.id in plugin.list_source_ids(authentication=None), R(
132132
'Cannot mirror non-public source', source)
133-
source = plugin.partition_source_for_mirroring(catalog, source)
134-
prefix = source.spec.prefix
135-
log.info('Queueing %d partitions of source %r in catalog %r',
136-
prefix.num_partitions, str(source.spec), catalog)
133+
if 'no_mirror' in source.spec.flags:
134+
log.info('Not mirroring source % r in catalog %r because `no_mirror` flag is present',
135+
str(source.spec), str(catalog))
136+
else:
137+
source = plugin.partition_source_for_mirroring(catalog, source)
138+
prefix = source.spec.prefix
139+
log.info('Queueing %d partitions of source %r in catalog %r',
140+
prefix.num_partitions, str(source.spec), catalog)
137141

138-
def message(partition: str) -> SQSMessage:
139-
log.debug('Queueing partition %r', partition)
140-
return self.mirror_partition_message(catalog, source, partition)
142+
def message(partition: str) -> SQSMessage:
143+
log.debug('Queueing partition %r', partition)
144+
return self.mirror_partition_message(catalog, source, partition)
141145

142-
messages = map(message, prefix.partition_prefixes())
143-
self.client.queue_mirror_messages(messages)
146+
messages = map(message, prefix.partition_prefixes())
147+
self.client.queue_mirror_messages(messages)
144148

145149
def mirror_partition(self,
146150
catalog: CatalogName,
@@ -167,13 +171,9 @@ def mirror_file(self,
167171
file = self.load_file(catalog, file_json)
168172
assert file.size is not None, R('File size unknown', file)
169173

170-
file_is_large = file.size > 1.5 * 1024 ** 3
171-
deployment_is_stable = (config.deployment.is_stable
172-
and not config.deployment.is_unit_test
173-
and catalog not in config.integration_test_catalogs)
174-
174+
max_file_size = config.catalogs[catalog].mirror_max_file_size
175175
service = self.service(catalog)
176-
if file_is_large and not deployment_is_stable:
176+
if max_file_size is not None and file.size > max_file_size:
177177
log.info('Not mirroring file to save cost: %r', file)
178178
elif service.info_exists(file):
179179
log.info('File is already mirrored, skipping upload: %r', file)

0 commit comments

Comments
 (0)