Skip to content

Commit 40057f9

Browse files
Move translator as metadata to specs loader
1 parent 28823ef commit 40057f9

File tree

2 files changed

+17
-12
lines changed

2 files changed

+17
-12
lines changed

python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
from dagster_airbyte.resources import AirbyteCloudWorkspace
77
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
8-
from dagster_airbyte.utils import DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY
98

109

1110
@experimental
@@ -105,9 +104,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
105104
group_name=group_name,
106105
can_subset=True,
107106
specs=[
108-
spec.merge_attributes(
109-
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
110-
)
107+
spec
111108
for spec in workspace.load_asset_specs(
112109
dagster_airbyte_translator=dagster_airbyte_translator
113110
)

python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
)
4646
from dagster_airbyte.types import AirbyteOutput
4747
from dagster_airbyte.utils import (
48+
DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY,
4849
get_airbyte_connection_table_name,
4950
get_translator_from_airbyte_assets,
5051
)
@@ -1339,16 +1340,23 @@ def load_airbyte_cloud_asset_specs(
13391340
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_cloud_workspace)
13401341
defs = dg.Definitions(assets=airbyte_cloud_specs)
13411342
"""
1343+
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()
1344+
13421345
with workspace.process_config_and_initialize_cm() as initialized_workspace:
1343-
return check.is_list(
1344-
AirbyteCloudWorkspaceDefsLoader(
1345-
workspace=initialized_workspace,
1346-
translator=dagster_airbyte_translator or DagsterAirbyteTranslator(),
1346+
return [
1347+
spec.merge_attributes(
1348+
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
13471349
)
1348-
.build_defs()
1349-
.assets,
1350-
AssetSpec,
1351-
)
1350+
for spec in check.is_list(
1351+
AirbyteCloudWorkspaceDefsLoader(
1352+
workspace=initialized_workspace,
1353+
translator=dagster_airbyte_translator,
1354+
)
1355+
.build_defs()
1356+
.assets,
1357+
AssetSpec,
1358+
)
1359+
]
13521360

13531361

13541362
@record

0 commit comments

Comments
 (0)