Skip to content

Commit b57682a

Browse files
committed
[u 6/6] Configure mirroring per source (#7066)
1 parent 46d5ca0 commit b57682a

File tree

11 files changed

+74
-23
lines changed

11 files changed

+74
-23
lines changed

deployments/anvilbox/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
is_sandbox = True
1010

1111
pop = 1 # remove snapshot
12+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1213

1314
type ProjectName = str
1415
type SourceSpec = str
@@ -42,8 +43,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
4243
google_project,
4344
snapshot,
4445
]),
45-
{}
46+
{
47+
'mirror': not (flags & no_mirror),
48+
}
4649
)
50+
4751
return project, source
4852

4953

deployments/anvildev/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88

99
pop = 1 # remove snapshot
10+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1011

1112
type ProjectName = str
1213
type SourceSpec = str
@@ -40,8 +41,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
4041
google_project,
4142
snapshot,
4243
]),
43-
{}
44+
{
45+
'mirror': not (flags & no_mirror),
46+
}
4447
)
48+
4549
return project, source
4650

4751

deployments/anvilprod/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010

1111
pop = 1 # remove snapshot
12+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1213

1314
type ProjectName = str
1415
type SourceSpec = str
@@ -43,8 +44,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
4344
google_project,
4445
snapshot,
4546
]),
46-
{}
47+
{
48+
'mirror': not (flags & no_mirror),
49+
}
4750
)
51+
4852
return project, source
4953

5054

deployments/dev/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88

99
pop = 1 # remove snapshot
10+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1011

1112
type ProjectName = str
1213
type SourceSpec = str
@@ -29,8 +30,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
2930
google_project,
3031
snapshot,
3132
]),
32-
{}
33+
{
34+
'mirror': not (flags & no_mirror),
35+
}
3336
)
37+
3438
return project, source
3539

3640

deployments/hammerbox/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
is_sandbox = True
1212

1313
pop = 1 # remove snapshot
14+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1415

1516
type ProjectName = str
1617
type SourceSpec = str
@@ -45,8 +46,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
4546
google_project,
4647
snapshot,
4748
]),
48-
{}
49+
{
50+
'mirror': not (flags & no_mirror),
51+
}
4952
)
53+
5054
return project, source
5155

5256

deployments/prod/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
)
1010

1111
pop = 1 # remove snapshot
12+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1213

1314
type ProjectName = str
1415
type SourceSpec = str
@@ -31,8 +32,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
3132
google_project,
3233
snapshot,
3334
]),
34-
{}
35+
{
36+
'mirror': not (flags & no_mirror),
37+
}
3538
)
39+
3640
return project, source
3741

3842

deployments/sandbox/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
is_sandbox = True
1010

1111
pop = 1 # remove snapshot
12+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1213

1314
type ProjectName = str
1415
type SourceSpec = str
@@ -31,8 +32,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
3132
google_project,
3233
snapshot,
3334
]),
34-
{}
35+
{
36+
'mirror': not (flags & no_mirror),
37+
}
3538
)
39+
3640
return project, source
3741

3842

deployments/tempdev/environment.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88

99
pop = 1 # remove snapshot
10+
no_mirror = 2 # do not mirror files from snapshot (redundant for managed access snapshots)
1011

1112
type ProjectName = str
1213
type SourceSpec = str
@@ -29,8 +30,11 @@ def mksrc(source_type: Literal['bigquery', 'parquet'],
2930
google_project,
3031
snapshot,
3132
]),
32-
{}
33+
{
34+
'mirror': not (flags & no_mirror),
35+
}
3336
)
37+
3438
return project, source
3539

3640

src/azul/azulclient.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import fnmatch
2+
import logging
3+
import uuid
14
from collections import (
25
defaultdict,
36
)
@@ -11,18 +14,15 @@
1114
from enum import (
1215
auto,
1316
)
14-
import fnmatch
1517
from functools import (
1618
partial,
1719
)
18-
import logging
20+
from itertools import starmap
1921
from pprint import (
2022
PrettyPrinter,
2123
)
2224
from typing import (
23-
AbstractSet,
24-
)
25-
import uuid
25+
AbstractSet, )
2626

2727
import attrs
2828
import requests
@@ -389,15 +389,22 @@ def is_queue_empty(self, queue_name: str) -> bool:
389389
length, _ = self.queues.get_queue_lengths(queues)
390390
return length == 0
391391

392-
def remote_mirror(self, catalog: CatalogName, sources: Iterable[SourceRef]):
392+
def remote_mirror(self,
393+
catalog: CatalogName,
394+
sources: Iterable[tuple[SourceRef, SourceConfig]]):
393395

394-
def message(source: SourceRef):
395-
log.info('Mirroring files in source %r from catalog %r',
396-
str(source.spec), catalog)
397-
return self.mirror_source_message(catalog, source)
396+
def messages():
397+
for source, cfg in sources:
398+
if cfg.mirror:
399+
log.info('Mirroring files in source %r from catalog %r',
400+
str(source.spec), catalog)
401+
yield self.mirror_source_message(catalog, source)
402+
else:
403+
log.info('Not mirroring any files in source %r from catalog %r because '
404+
'mirroring is explicitly disabled',
405+
str(source.spec), catalog)
398406

399-
messages = map(message, sources)
400-
self.queue_mirror_messages(messages)
407+
self.queue_mirror_messages(messages())
401408

402409
def _get_non_empty_fail_queues(self) -> set[str]:
403410
return {

src/azul/indexer/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ class SourceConfig(SerializableAttrs):
412412
"""
413413
Configuration
414414
"""
415-
pass
415+
mirror: bool
416416

417417

418418
@attrs.frozen(kw_only=True, order=True)

0 commit comments

Comments
 (0)