Skip to content

Commit 81888a7

Browse files
committed
[u 6/6] Configure mirroring per source (#7066)
1 parent dec55f8 commit 81888a7

File tree

3 files changed

+40
-23
lines changed

3 files changed

+40
-23
lines changed

src/azul/indexer/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
)
1111
import json
1212
import logging
13+
1314
import math
1415
from typing import (
1516
Any,
@@ -412,7 +413,7 @@ class SourceConfig(SerializableAttrs):
412413
"""
413414
Configuration
414415
"""
415-
pass
416+
mirror: bool = attrs.field(default=True)
416417

417418

418419
@attrs.frozen(kw_only=True, order=True)

src/azul/indexer/mirror_controller.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -136,28 +136,32 @@ def mirror_source(self, catalog: CatalogName, source_json: JSON):
136136
source = plugin.source_ref_cls.from_json(source_json)
137137
assert source.id in plugin.list_source_ids(authentication=None), R(
138138
'Cannot mirror non-public source', source)
139-
# The desired partition size depends on the maximum number of messages
140-
# we can send in one Lambda invocation, because queueing the individual
141-
# mirror_file messages turns out to dominate the running time of
142-
# handling a mirror_source message.
143-
partition_size = int(
144-
aws.sqs_fifo_rate_limit # max. # of SendMessage calls per second
145-
* self.client.queues.batch_size # number of messages per call
146-
* config.mirror_lambda_timeout # max. duration of the invocation
147-
/ config.mirroring_concurrency # number of concurrent invocations
148-
/ 2 # safety margin
149-
)
150-
source = plugin.partition_source_for_mirroring(catalog, source, partition_size)
151-
prefix = source.prefix
152-
log.info('Queueing %d partitions of source %r in catalog %r',
153-
prefix.num_partitions, str(source.spec), catalog)
139+
if not source.config.mirror:
140+
log.info('Not mirroring source % r in catalog %r because `no_mirror` flag is present',
141+
str(source.spec), str(catalog))
142+
else:
143+
# The desired partition size depends on the maximum number of messages
144+
# we can send in one Lambda invocation, because queueing the individual
145+
# mirror_file messages turns out to dominate the running time of
146+
# handling a mirror_source message.
147+
partition_size = int(
148+
aws.sqs_fifo_rate_limit # max. # of SendMessage calls per second
149+
* self.client.queues.batch_size # number of messages per call
150+
* config.mirror_lambda_timeout # max. duration of the invocation
151+
/ config.mirroring_concurrency # number of concurrent invocations
152+
/ 2 # safety margin
153+
)
154+
source = plugin.partition_source_for_mirroring(catalog, source, partition_size)
155+
prefix = source.prefix
156+
log.info('Queueing %d partitions of source %r in catalog %r',
157+
prefix.num_partitions, str(source.spec), catalog)
154158

155-
def message(partition: str) -> SQSMessage:
156-
log.debug('Queueing partition %r', partition)
157-
return self.mirror_partition_message(catalog, source, partition)
159+
def message(partition: str) -> SQSMessage:
160+
log.debug('Queueing partition %r', partition)
161+
return self.mirror_partition_message(catalog, source, partition)
158162

159-
messages = map(message, prefix.partition_prefixes())
160-
self.client.queue_mirror_messages(messages)
163+
messages = map(message, prefix.partition_prefixes())
164+
self.client.queue_mirror_messages(messages)
161165

162166
def mirror_partition(self,
163167
catalog: CatalogName,

test/integration_test.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,8 @@ def managed_access_sources_by_catalog(self
296296
def _select_source(self,
297297
catalog: CatalogName,
298298
*,
299-
public: bool | None = None
299+
public: bool | None = None,
300+
mirror: bool = False,
300301
) -> SourceRef | None:
301302
"""
302303
Choose an indexed source at random.
@@ -309,6 +310,11 @@ def _select_source(self,
309310
public sources. If false, choose a non-public source, or
310311
return `None` if the catalog contains no non-public
311312
sources.
313+
314+
:param mirror: If true, choose a source where the `no_mirror` flag is
315+
not present, or return `None` if the catalog contains no
316+
such source. If false, choose a source regardless of
317+
whether this flag is present.
312318
"""
313319
plugin = self.repository_plugin(catalog)
314320
sources = set(plugin.sources)
@@ -327,6 +333,12 @@ def _select_source(self,
327333
sources &= ma_sources
328334
else:
329335
assert False, public
336+
if mirror:
337+
sources = {
338+
source
339+
for source in sources
340+
if 'no_mirror' not in SourceSpec.parse_flags_only(source)
341+
}
330342
if len(sources) == 0:
331343
assert public is False, 'An IT catalog must contain at least one public source'
332344
return None
@@ -1710,7 +1722,7 @@ def _test_mirroring(self, *, delete: bool):
17101722
if c.is_integration_test_catalog and c.mirror_max_file_size >= 0
17111723
]
17121724
sources_by_catalog = {
1713-
catalog: [self._select_source(catalog, public=True)]
1725+
catalog: [self._select_source(catalog, public=True, mirror=True)]
17141726
for catalog in catalogs
17151727
}
17161728

0 commit comments

Comments
 (0)