Skip to content

Commit c717f03

Browse files
[14/n][dagster-airbyte] Implement AirbyteCloudWorkspace.sync_and_poll
1 parent 3641abe commit c717f03

File tree

10 files changed

+345
-105
lines changed

10 files changed

+345
-105
lines changed

python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_decorator.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from dagster_airbyte.resources import AirbyteCloudWorkspace
77
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
8+
from dagster_airbyte.utils import DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY
89

910

1011
@experimental
@@ -101,14 +102,18 @@ def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: Airbyt
101102
resources={"airbyte": airbyte_workspace},
102103
)
103104
"""
105+
dagster_airbyte_translator = dagster_airbyte_translator or DagsterAirbyteTranslator()
106+
104107
return multi_asset(
105108
name=name,
106109
group_name=group_name,
107-
can_subset=False,
110+
can_subset=True,
108111
specs=[
109-
spec
112+
spec.merge_attributes(
113+
metadata={DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY: dagster_airbyte_translator}
114+
)
110115
for spec in workspace.load_asset_specs(
111-
dagster_airbyte_translator=dagster_airbyte_translator or DagsterAirbyteTranslator()
116+
dagster_airbyte_translator=dagster_airbyte_translator
112117
)
113118
if AirbyteMetadataSet.extract(spec.metadata).connection_id == connection_id
114119
],

python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import hashlib
22
import inspect
33
import os
4-
import re
54
from abc import abstractmethod
65
from functools import partial
76
from itertools import chain
@@ -57,6 +56,7 @@
5756
from dagster_airbyte.translator import AirbyteMetadataSet, DagsterAirbyteTranslator
5857
from dagster_airbyte.types import AirbyteTableMetadata
5958
from dagster_airbyte.utils import (
59+
clean_name,
6060
generate_materializations,
6161
generate_table_schema,
6262
is_basic_normalization_operation,
@@ -471,11 +471,6 @@ def _get_normalization_tables_for_schema(
471471
return out
472472

473473

474-
def _clean_name(name: str) -> str:
475-
"""Cleans an input to be a valid Dagster asset name."""
476-
return re.sub(r"[^a-z0-9]+", "_", name.lower())
477-
478-
479474
class AirbyteConnectionMetadata(
480475
NamedTuple(
481476
"_AirbyteConnectionMetadata",
@@ -917,7 +912,7 @@ def load_assets_from_airbyte_instance(
917912
workspace_id: Optional[str] = None,
918913
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
919914
create_assets_for_normalization_tables: bool = True,
920-
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name,
915+
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name,
921916
connection_meta_to_group_fn: Optional[
922917
Callable[[AirbyteConnectionMetadata], Optional[str]]
923918
] = None,
@@ -1022,7 +1017,7 @@ def load_assets_from_airbyte_instance(
10221017
check.invariant(
10231018
not connection_meta_to_group_fn
10241019
or not connection_to_group_fn
1025-
or connection_to_group_fn == _clean_name,
1020+
or connection_to_group_fn == clean_name,
10261021
"Cannot specify both connection_meta_to_group_fn and connection_to_group_fn",
10271022
)
10281023

@@ -1140,8 +1135,8 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
11401135
@airbyte_assets(
11411136
connection_id=connection_id,
11421137
workspace=workspace,
1143-
name=_clean_name(connection_id),
1144-
group_name=_clean_name(connection_id),
1138+
name=clean_name(connection_id),
1139+
group_name=clean_name(connection_id),
11451140
dagster_airbyte_translator=dagster_airbyte_translator,
11461141
)
11471142
def _asset_fn(context: AssetExecutionContext, airbyte: AirbyteCloudWorkspace):

python_modules/libraries/dagster-airbyte/dagster_airbyte/managed/reconciliation.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
from dagster_airbyte.asset_defs import (
3737
AirbyteConnectionMetadata,
3838
AirbyteInstanceCacheableAssetsDefinition,
39-
_clean_name,
4039
)
4140
from dagster_airbyte.managed.types import (
4241
MANAGED_ELEMENTS_DEPRECATION_MSG,
@@ -50,7 +49,7 @@
5049
InitializedAirbyteSource,
5150
)
5251
from dagster_airbyte.resources import AirbyteResource
53-
from dagster_airbyte.utils import is_basic_normalization_operation
52+
from dagster_airbyte.utils import is_basic_normalization_operation, clean_name
5453

5554

5655
def gen_configured_stream_json(
@@ -746,7 +745,7 @@ def load_assets_from_connections(
746745
connections: Iterable[AirbyteConnection],
747746
key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
748747
create_assets_for_normalization_tables: bool = True,
749-
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name,
748+
connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = clean_name,
750749
connection_meta_to_group_fn: Optional[
751750
Callable[[AirbyteConnectionMetadata], Optional[str]]
752751
] = None,
@@ -821,7 +820,7 @@ def load_assets_from_connections(
821820
check.invariant(
822821
not connection_meta_to_group_fn
823822
or not connection_to_group_fn
824-
or connection_to_group_fn == _clean_name,
823+
or connection_to_group_fn == clean_name,
825824
"Cannot specify both connection_meta_to_group_fn and connection_to_group_fn",
826825
)
827826

python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
import requests
1212
from dagster import (
1313
AssetExecutionContext,
14+
AssetMaterialization,
1415
ConfigurableResource,
1516
Definitions,
1617
Failure,
1718
InitResourceContext,
19+
MaterializeResult,
1820
OpExecutionContext,
1921
_check as check,
2022
get_dagster_logger,
@@ -34,13 +36,19 @@
3436

3537
from dagster_airbyte.translator import (
3638
AirbyteConnection,
39+
AirbyteConnectionTableProps,
3740
AirbyteDestination,
3841
AirbyteJob,
3942
AirbyteJobStatusType,
43+
AirbyteMetadataSet,
4044
AirbyteWorkspaceData,
4145
DagsterAirbyteTranslator,
4246
)
4347
from dagster_airbyte.types import AirbyteOutput
48+
from dagster_airbyte.utils import (
49+
get_airbyte_connection_table_name,
50+
get_translator_from_airbyte_assets,
51+
)
4452

4553
AIRBYTE_REST_API_BASE = "https://api.airbyte.com"
4654
AIRBYTE_REST_API_VERSION = "v1"
@@ -1211,10 +1219,91 @@ def load_asset_specs(
12111219
workspace=self, dagster_airbyte_translator=dagster_airbyte_translator
12121220
)
12131221

1222+
def _generate_materialization(
1223+
self,
1224+
airbyte_output: AirbyteOutput,
1225+
dagster_airbyte_translator: DagsterAirbyteTranslator,
1226+
):
1227+
connection = AirbyteConnection.from_connection_details(
1228+
connection_details=airbyte_output.connection_details
1229+
)
1230+
1231+
for stream in connection.streams.values():
1232+
if stream.selected:
1233+
connection_table_name = get_airbyte_connection_table_name(
1234+
stream_prefix=connection.stream_prefix,
1235+
stream_name=stream.name,
1236+
)
1237+
stream_asset_spec = dagster_airbyte_translator.get_asset_spec(
1238+
props=AirbyteConnectionTableProps(
1239+
table_name=connection_table_name,
1240+
stream_prefix=connection.stream_prefix,
1241+
stream_name=stream.name,
1242+
json_schema=stream.json_schema,
1243+
connection_id=connection.id,
1244+
connection_name=connection.name,
1245+
destination_type=None,
1246+
database=None,
1247+
schema=None,
1248+
)
1249+
)
1250+
1251+
yield AssetMaterialization(
1252+
asset_key=stream_asset_spec.key,
1253+
description=(
1254+
f"Table generated via Airbyte Cloud sync "
1255+
f"for connection {connection.name}: {connection_table_name}"
1256+
),
1257+
metadata=stream_asset_spec.metadata,
1258+
)
1259+
12141260
def sync_and_poll(
1215-
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
1261+
self, context: Union[OpExecutionContext, AssetExecutionContext]
12161262
):
1217-
raise NotImplementedError()
1263+
"""Executes a sync and poll process to materialize Airbyte Cloud assets.
1264+
1265+
Args:
1266+
context (Union[OpExecutionContext, AssetExecutionContext]): The execution context
1267+
from within `@airbyte_assets`. If an AssetExecutionContext is passed,
1268+
its underlying OpExecutionContext will be used.
1269+
1270+
Returns:
1271+
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
1272+
or AssetMaterialization.
1273+
"""
1274+
assets_def = context.assets_def
1275+
dagster_airbyte_translator = get_translator_from_airbyte_assets(assets_def)
1276+
connection_id = next(
1277+
check.not_none(AirbyteMetadataSet.extract(spec.metadata).connection_id)
1278+
for spec in assets_def.specs
1279+
)
1280+
1281+
client = self.get_client()
1282+
airbyte_output = client.sync_and_poll(
1283+
connection_id=connection_id,
1284+
)
1285+
1286+
materialized_asset_keys = set()
1287+
for materialization in self._generate_materialization(
1288+
airbyte_output=airbyte_output, dagster_airbyte_translator=dagster_airbyte_translator
1289+
):
1290+
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
1291+
# Otherwise, emit a runtime AssetMaterialization.
1292+
if materialization.asset_key in context.selected_asset_keys:
1293+
yield MaterializeResult(
1294+
asset_key=materialization.asset_key, metadata=materialization.metadata
1295+
)
1296+
materialized_asset_keys.add(materialization.asset_key)
1297+
else:
1298+
context.log.warning(
1299+
f"An unexpected asset was materialized: {materialization.asset_key}. "
1300+
f"Yielding a materialization event."
1301+
)
1302+
yield materialization
1303+
1304+
unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
1305+
if unmaterialized_asset_keys:
1306+
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
12181307

12191308

12201309
@experimental

python_modules/libraries/dagster-airbyte/dagster_airbyte/translator.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class AirbyteConnectionTableProps:
4141
json_schema: Mapping[str, Any]
4242
connection_id: str
4343
connection_name: str
44-
destination_type: str
44+
destination_type: Optional[str]
4545
database: Optional[str]
4646
schema: Optional[str]
4747

@@ -231,5 +231,5 @@ def get_asset_spec(self, props: AirbyteConnectionTableProps) -> AssetSpec:
231231
return AssetSpec(
232232
key=AssetKey(props.table_name),
233233
metadata=metadata,
234-
kinds={"airbyte", props.destination_type},
234+
kinds={"airbyte", *({props.destination_type} if props.destination_type else set())},
235235
)

python_modules/libraries/dagster-airbyte/dagster_airbyte/utils.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,26 @@
1-
from typing import Any, Iterator, Mapping, Optional, Sequence
1+
import re
2+
from typing import TYPE_CHECKING, Any, Iterator, Mapping, Optional, Sequence
23

3-
from dagster import AssetMaterialization, MetadataValue
4+
from dagster import (
5+
AssetMaterialization,
6+
AssetsDefinition,
7+
DagsterInvariantViolationError,
8+
MetadataValue,
9+
)
410
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
511

612
from dagster_airbyte.types import AirbyteOutput
713

14+
if TYPE_CHECKING:
15+
from dagster_airbyte import DagsterAirbyteTranslator
16+
17+
DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY = "dagster-airbyte/dagster_airbyte_translator"
18+
19+
20+
def clean_name(name: str) -> str:
21+
"""Cleans an input to be a valid Dagster asset name."""
22+
return re.sub(r"[^a-z0-9]+", "_", name.lower())
23+
824

925
def get_airbyte_connection_table_name(stream_prefix: Optional[str], stream_name: str) -> str:
1026
return f"{stream_prefix if stream_prefix else ''}{stream_name}"
@@ -78,3 +94,18 @@ def generate_materializations(
7894
all_stream_stats.get(stream_name, {}),
7995
asset_key_prefix=asset_key_prefix,
8096
)
97+
98+
99+
def get_translator_from_airbyte_assets(
100+
airbyte_assets: AssetsDefinition,
101+
) -> "DagsterAirbyteTranslator":
102+
metadata_by_key = airbyte_assets.metadata_by_key or {}
103+
first_asset_key = next(iter(airbyte_assets.metadata_by_key.keys()))
104+
first_metadata = metadata_by_key.get(first_asset_key, {})
105+
dagster_airbyte_translator = first_metadata.get(DAGSTER_AIRBYTE_TRANSLATOR_METADATA_KEY)
106+
if dagster_airbyte_translator is None:
107+
raise DagsterInvariantViolationError(
108+
f"Expected to find airbyte translator metadata on asset {first_asset_key.to_user_string()},"
109+
" but did not. Did you pass in assets that weren't generated by @airbyte_assets?"
110+
)
111+
return dagster_airbyte_translator

0 commit comments

Comments
 (0)