Skip to content

Commit 827e42d

Browse files
Update post review
1 parent 3654def commit 827e42d

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ def airbyte_assets(
4545
4646
@airbyte_assets(
4747
connection_id="airbyte_connection_id",
48-
name="airbyte_connection_id",
49-
group_name="airbyte_connection_id",
5048
workspace=airbyte_workspace,
5149
)
5250
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
@@ -74,8 +72,8 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
7472
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
7573
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
7674
default_spec = super().get_asset_spec(props)
77-
return default_spec.replace_attributes(
78-
key=asset_spec.key.with_prefix("my_prefix"),
75+
return default_spec.merge_attributes(
76+
metadata={"custom": "metadata"},
7977
)
8078
8179
airbyte_workspace = AirbyteCloudWorkspace(
@@ -87,8 +85,6 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
8785
8886
@airbyte_assets(
8987
connection_id="airbyte_connection_id",
90-
name="airbyte_connection_id",
91-
group_name="airbyte_connection_id",
9288
workspace=airbyte_workspace,
9389
dagster_airbyte_translator=CustomDagsterAirbyteTranslator()
9490
)
@@ -104,7 +100,7 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
104100
return multi_asset(
105101
name=name,
106102
group_name=group_name,
107-
can_subset=False,
103+
can_subset=True,
108104
specs=[
109105
spec
110106
for spec in workspace.load_asset_specs(

python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,8 +1102,8 @@ def build_airbyte_assets_definitions(
11021102
class CustomDagsterAirbyteTranslator(DagsterAirbyteTranslator):
11031103
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
11041104
default_spec = super().get_asset_spec(props)
1105-
return default_spec.replace_attributes(
1106-
key=asset_spec.key.with_prefix("my_prefix"),
1105+
return default_spec.merge_attributes(
1106+
metadata={"custom": "metadata"},
11071107
)
11081108
11091109
airbyte_workspace = AirbyteCloudWorkspace(
@@ -1129,19 +1129,22 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
11291129
dagster_airbyte_translator=dagster_airbyte_translator
11301130
)
11311131

1132-
connection_ids = {
1133-
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id)
1132+
connections = {
1133+
(
1134+
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id),
1135+
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_name),
1136+
)
11341137
for spec in all_asset_specs
11351138
}
11361139

11371140
_asset_fns = []
1138-
for connection_id in connection_ids:
1141+
for connection_id, connection_name in connections:
11391142

11401143
@airbyte_assets(
11411144
connection_id=connection_id,
11421145
workspace=workspace,
1143-
name=_clean_name(connection_id),
1144-
group_name=_clean_name(connection_id),
1146+
name=_clean_name(connection_name),
1147+
group_name=_clean_name(connection_name),
11451148
dagster_airbyte_translator=dagster_airbyte_translator,
11461149
)
11471150
def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):

0 commit comments

Comments
 (0)