Skip to content

Commit dafcd9b

Browse files
[docs] Add docs for reworked Airbyte Cloud API
1 parent 98fd3ad commit dafcd9b

File tree

7 files changed

+312
-0
lines changed

7 files changed

+312
-0
lines changed
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
---
2+
title: "Using Dagster with Airbyte Cloud"
3+
description: Represent your Airbyte Cloud connections in Dagster
4+
---
5+
6+
# Using Dagster with Airbyte Cloud
7+
8+
This guide provides instructions for using Dagster with Airbyte Cloud using the `dagster-airbyte` library. Your Airbyte Cloud connection tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Airbyte Cloud assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Airbyte Cloud connections, allowing you to trigger syncs for these on a cadence or based on upstream data changes.
9+
10+
## What you'll learn
11+
12+
- How to represent Airbyte Cloud assets in the Dagster asset graph, including lineage to other Dagster assets.
13+
- How to customize asset definition metadata for these Airbyte Cloud assets.
14+
- How to materialize Airbyte Cloud connection tables from Dagster.
15+
- How to customize how Airbyte Cloud connection tables are materialized.
16+
17+
<details>
18+
<summary>Prerequisites</summary>
19+
20+
- The `dagster` and `dagster-airbyte` libraries installed in your environment
21+
- Familiarity with asset definitions and the Dagster asset graph
22+
- Familiarity with Dagster resources
23+
- Familiarity with Airbyte Cloud concepts, like connections and connection tables
24+
- A Airbyte Cloud workspace
25+
- A Airbyte Cloud client ID and client secret. For more information, see [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation.
26+
27+
</details>
28+
29+
## Set up your environment
30+
31+
To get started, you'll need to install the `dagster` and `dagster-airbyte` Python packages:
32+
33+
```bash
34+
pip install dagster dagster-airbyte
35+
```
36+
37+
## Represent Airbyte Cloud assets in the asset graph
38+
39+
To load Airbyte Cloud assets into the Dagster asset graph, you must first construct a <PyObject module="dagster_airbyte" object="AirbyteCloudWorkspace" /> resource, which allows Dagster to communicate with your Airbyte Cloud workspace. You'll need to supply your workspace ID, client ID and client secret. See [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation for more information on how to create your client ID and client secret.
40+
41+
Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the <PyObject module="dagster_airbyte" method="load_airbyte_cloud_asset_specs" /> function, which returns list of <PyObject object="AssetSpec" />s representing your Airbyte Cloud assets. You can then include these asset specs in your <PyObject object="Definitions" /> object:
42+
43+
```python file=/integrations/airbyte-cloud/representing_airbyte_cloud_assets.py
44+
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
45+
46+
import dagster as dg
47+
48+
airbyte_workspace = AirbyteCloudWorkspace(
49+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
50+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
51+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
52+
)
53+
54+
55+
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_workspace)
56+
defs = dg.Definitions(assets=airbyte_cloud_specs)
57+
```
58+
59+
### Sync and materialize Airbyte Cloud assets
60+
61+
You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the <PyObject module="dagster_airbyte" method="build_airbyte_assets_definitions" /> factory to create all assets definitions for your Airbyte Cloud workspace.
62+
63+
```python file=/integrations/airbyte-cloud/sync_and_materialize_airbyte_cloud_assets.py
64+
from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions
65+
66+
import dagster as dg
67+
68+
airbyte_workspace = AirbyteCloudWorkspace(
69+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
70+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
71+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
72+
)
73+
74+
all_airbyte_assets = build_airbyte_assets_definitions(workspace=airbyte_workspace)
75+
76+
defs = dg.Definitions(
77+
assets=all_airbyte_assets,
78+
resources={"airbyte": airbyte_workspace},
79+
)
80+
```
81+
82+
### Customize the materialization of Airbyte Cloud assets
83+
84+
If you want to customize the sync of your connections, you can use the <PyObject module="dagster_airbyte" method="airbyte_assets" /> decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync.
85+
86+
```python file=/integrations/airbyte-cloud/customize_airbyte_cloud_asset_defs.py
87+
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
88+
89+
import dagster as dg
90+
91+
airbyte_workspace = AirbyteCloudWorkspace(
92+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
93+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
94+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
95+
)
96+
97+
98+
@airbyte_assets(
99+
connection_id="airbyte_connection_id",
100+
workspace=airbyte_workspace,
101+
name="airbyte_connection_name",
102+
group_name="airbyte_connection_name",
103+
)
104+
def airbyte_connection_assets(
105+
context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace
106+
):
107+
# Do something before the materialization...
108+
yield from airbyte.sync_and_poll(context=context)
109+
# Do something after the materialization...
110+
111+
112+
defs = dg.Definitions(
113+
assets=[airbyte_connection_assets],
114+
resources={"airbyte": airbyte_workspace},
115+
)
116+
```
117+
118+
### Customize asset definition metadata for Airbyte Cloud assets
119+
120+
By default, Dagster will generate asset specs for each Airbyte Cloud asset and populate default metadata. You can further customize asset properties by passing an instance of the custom <PyObject module="dagster_airbyte" object="DagsterAirbyteTranslator" /> to the <PyObject module="dagster_airbyte" method="load_airbyte_cloud_asset_specs" /> function.
121+
122+
```python file=/integrations/airbyte-cloud/customize_airbyte_cloud_translator_asset_spec.py
123+
from dagster_airbyte import (
124+
AirbyteCloudWorkspace,
125+
AirbyteConnectionTableProps,
126+
DagsterAirbyteTranslator,
127+
load_airbyte_cloud_asset_specs,
128+
)
129+
130+
import dagster as dg
131+
132+
airbyte_workspace = AirbyteCloudWorkspace(
133+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
134+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
135+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
136+
)
137+
138+
139+
# A translator class lets us customize properties of the built
140+
# Airbyte Cloud assets, such as the owners or asset key
141+
class MyCustomAirbyteTranslator(DagsterAirbyteTranslator):
142+
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
143+
# We create the default asset spec using super()
144+
default_spec = super().get_asset_spec(props)
145+
# We customize the metadata and asset key prefix for all assets
146+
return default_spec.replace_attributes(
147+
key=default_spec.key.with_prefix("prefix"),
148+
).merge_attributes(metadata={"custom": "metadata"})
149+
150+
151+
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
152+
airbyte_workspace, dagster_airbyte_translator=MyCustomAirbyteTranslator()
153+
)
154+
155+
defs = dg.Definitions(assets=airbyte_cloud_specs)
156+
```
157+
158+
Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.
159+
160+
You can pass an instance of the custom <PyObject module="dagster_airbyte" object="DagsterAirbyteTranslator" /> to the <PyObject module="dagster_airbyte" method="airbyte_assets" /> decorator or the <PyObject module="dagster_airbyte" method="build_airbyte_assets_definitions" /> factory.
161+
162+
### Load Airbyte Cloud assets from multiple workspaces
163+
164+
Definitions from multiple Airbyte Cloud workspaces can be combined by instantiating multiple <PyObject module="dagster_airbyte" object="AirbyteCloudWorkspace" /> resources and merging their specs. This lets you view all your Airbyte Cloud assets in a single asset graph:
165+
166+
```python file=/integrations/airbyte-cloud/multiple_airbyte_cloud_workspaces.py
167+
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
168+
169+
import dagster as dg
170+
171+
sales_airbyte_workspace = AirbyteCloudWorkspace(
172+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_WORKSPACE_ID"),
173+
client_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_ID"),
174+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_SECRET"),
175+
)
176+
177+
marketing_airbyte_workspace = AirbyteCloudWorkspace(
178+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_WORKSPACE_ID"),
179+
client_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_ID"),
180+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_SECRET"),
181+
)
182+
183+
sales_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
184+
workspace=sales_airbyte_workspace
185+
)
186+
marketing_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
187+
workspace=marketing_airbyte_workspace
188+
)
189+
190+
# Merge the specs into a single set of definitions
191+
defs = dg.Definitions(
192+
assets=[*sales_airbyte_cloud_specs, *marketing_airbyte_cloud_specs],
193+
)
194+
```

examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/__init__.py

Whitespace-only changes.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets
2+
3+
import dagster as dg
4+
5+
airbyte_workspace = AirbyteCloudWorkspace(
6+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
7+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
8+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
9+
)
10+
11+
12+
@airbyte_assets(
13+
connection_id="airbyte_connection_id",
14+
workspace=airbyte_workspace,
15+
name="airbyte_connection_name",
16+
group_name="airbyte_connection_name",
17+
)
18+
def airbyte_connection_assets(
19+
context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace
20+
):
21+
# Do something before the materialization...
22+
yield from airbyte.sync_and_poll(context=context)
23+
# Do something after the materialization...
24+
25+
26+
defs = dg.Definitions(
27+
assets=[airbyte_connection_assets],
28+
resources={"airbyte": airbyte_workspace},
29+
)
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from dagster_airbyte import (
2+
AirbyteCloudWorkspace,
3+
AirbyteConnectionTableProps,
4+
DagsterAirbyteTranslator,
5+
load_airbyte_cloud_asset_specs,
6+
)
7+
8+
import dagster as dg
9+
10+
airbyte_workspace = AirbyteCloudWorkspace(
11+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
12+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
13+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
14+
)
15+
16+
17+
# A translator class lets us customize properties of the built
18+
# Airbyte Cloud assets, such as the owners or asset key
19+
class MyCustomAirbyteTranslator(DagsterAirbyteTranslator):
20+
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
21+
# We create the default asset spec using super()
22+
default_spec = super().get_asset_spec(props)
23+
# We customize the metadata and asset key prefix for all assets
24+
return default_spec.replace_attributes(
25+
key=default_spec.key.with_prefix("prefix"),
26+
).merge_attributes(metadata={"custom": "metadata"})
27+
28+
29+
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
30+
airbyte_workspace, dagster_airbyte_translator=MyCustomAirbyteTranslator()
31+
)
32+
33+
defs = dg.Definitions(assets=airbyte_cloud_specs)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
2+
3+
import dagster as dg
4+
5+
sales_airbyte_workspace = AirbyteCloudWorkspace(
6+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_WORKSPACE_ID"),
7+
client_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_ID"),
8+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_SECRET"),
9+
)
10+
11+
marketing_airbyte_workspace = AirbyteCloudWorkspace(
12+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_WORKSPACE_ID"),
13+
client_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_ID"),
14+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_SECRET"),
15+
)
16+
17+
sales_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
18+
workspace=sales_airbyte_workspace
19+
)
20+
marketing_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
21+
workspace=marketing_airbyte_workspace
22+
)
23+
24+
# Merge the specs into a single set of definitions
25+
defs = dg.Definitions(
26+
assets=[*sales_airbyte_cloud_specs, *marketing_airbyte_cloud_specs],
27+
)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs
2+
3+
import dagster as dg
4+
5+
airbyte_workspace = AirbyteCloudWorkspace(
6+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
7+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
8+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
9+
)
10+
11+
12+
airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_workspace)
13+
defs = dg.Definitions(assets=airbyte_cloud_specs)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions
2+
3+
import dagster as dg
4+
5+
airbyte_workspace = AirbyteCloudWorkspace(
6+
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
7+
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
8+
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
9+
)
10+
11+
all_airbyte_assets = build_airbyte_assets_definitions(workspace=airbyte_workspace)
12+
13+
defs = dg.Definitions(
14+
assets=all_airbyte_assets,
15+
resources={"airbyte": airbyte_workspace},
16+
)

0 commit comments

Comments
 (0)