Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14/n][dagster-airbyte] Implement materialization method for AirbyteCloudWorkspace #26559

Open
wants to merge 4 commits into
base: maxime/rework-airbyte-cloud-13
Choose a base branch
from

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Dec 18, 2024

Summary & Motivation

This PR implements AirbyteCloudWorkspace.sync_and_poll, the materialization method for Airbyte Cloud assets. This method:

  • calls AirbyteCloudClient.sync_and_poll
  • takes the AirbyteOutput returned by AirbyteCloudClient.sync_and_poll and generates the asset materializations
  • yields MaterializeResult for each expected asset and AssetMaterialization for each unexpected asset
    • a connection table that was not in the connection at definitions loading time can be in the AirbyteOutput. Eg. the table was added after definitions loading time and before sync.
  • logs a warning for each unmaterialized table
    • a connection table can be created at definitions loading time, but can be missing in the AirbyteOutput. Eg. the table was deleted after definitions loading time and before sync.

Can be leveraged like:

from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
    workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
    client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
    client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
    connection_id="airbyte_connection_id",
    name="airbyte_connection_id",
    group_name="airbyte_connection_id",
    workspace=airbyte_workspace,
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
    yield from airbyte.sync_and_poll(context=context)


defs = dg.Definitions(
    assets=[airbyte_connection_assets],
    resources={"airbyte": airbyte_workspace},
)

How I Tested These Changes

Additional tests with BK

Changelog

[dagster-airbyte] Airbyte Cloud assets can now be materialized using the AirbyteCloudWorkspace.sync_and_poll(…) method in the definition of a @airbyte_assets decorator.

@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from 71c99c0 to 3641abe Compare December 18, 2024 01:07
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 7d77d65 to c717f03 Compare December 18, 2024 01:07
@maximearmstrong maximearmstrong changed the title [14/n][dagster-airbyte] Implement AirbyteCloudWorkspace.sync_and_poll [14/n][dagster-airbyte] Implement materialization method for AirbyteCloudWorkspace Dec 18, 2024
@maximearmstrong maximearmstrong marked this pull request as ready for review December 18, 2024 01:22
@maximearmstrong maximearmstrong self-assigned this Dec 18, 2024
specs=[
spec
spec.merge_attributes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the translator to the metadata to reuse it in the materialization process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the pattern predates this PR, but I feel like it's a pretty big antipattern. Perhaps I'm missing something, but I also don't see a subprocess actually being called anywhere? I don't see why we need to attach the translator to access it, why can't the translator just be an argument that you need to pass?

I guess the argument is that we do the same thing for dbt, likely fivetran, etc. but I think it's a pretty rough pattern that stretches our metadata concept beyond what's intended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, kinda weird that we don't do this in the spec loader? Feels like there's a potential footgun there if a user for whatever reason decides to use the spec loader then provide to a multi asset, won't contain the translator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, kinda weird that we don't do this in the spec loader? Feels like there's a potential footgun there if a user for whatever reason decides to use the spec loader then provide to a multi asset, won't contain the translator

That's a good call. I agree that it should be set at the spec loader level. Currently, it is set at the asset spec level for dlt, but not for dbt and Sling - I will open a ticket about that.

I know the pattern predates this PR, but I feel like it's a pretty big antipattern. Perhaps I'm missing something, but I also don't see a subprocess actually being called anywhere? I don't see why we need to attach the translator to access it, why can't the translator just be an argument that you need to pass?

I think the main argument here - and for all other integrations - is how XWorkspace.sync_and_poll(...) is expected to be called, which is only in the context of the asset decorator.

Passing the translator to both the decorator and sync_and_poll function is a bit awkward, and would most likely be error-prone.

@airbyte_assets(
    connection_id="airbyte_connection_id",
    name="airbyte_connection_id",
    group_name="airbyte_connection_id",
    workspace=airbyte_workspace,
    dagster_airbyte_translator=CustomDagsterAirbyteTranslator(),
)
def airbyte_connection_assets(context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace):
    yield from airbyte.sync_and_poll(
        context=context, 
        dagster_airbyte_translator=CustomDagsterAirbyteTranslator() # potential errors here if another translator is passed
    )

That said, I agree that leveraging the metadata concept for that is kinda weird. I will open a ticket for that as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to spec loader in 40057f9

Copy link
Contributor

@cmpadden cmpadden Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the translator in metadata felt like anti-pattern when implementing the dlt integration too, but at that point the goal was trying to be consistent with dbt, and as Maxime said, specifying it in both locations is not ideal. Less ergonomic for the integration maintainer, more ergonomic for the user.

Just want to share that I have the same sentiment.

Copy link
Contributor

@dpeng817 dpeng817 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the pattern but I agree that to be consistent we probably want to add the translator to the spec metadata.

I think we need to be consistent and also do it in the spec loader.

@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from 3641abe to aca5ae6 Compare December 18, 2024 22:42
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 3d4944c to 8450ca9 Compare December 18, 2024 22:42
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from aca5ae6 to 3e9f644 Compare December 19, 2024 01:19
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 8450ca9 to 6c0df71 Compare December 19, 2024 01:19
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from 3e9f644 to c543523 Compare December 19, 2024 03:29
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 6c0df71 to 7e622f2 Compare December 19, 2024 03:29
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from c543523 to 9636e66 Compare December 19, 2024 04:01
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from 7e622f2 to b4ebced Compare December 19, 2024 04:01
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-13 branch from 9636e66 to c48ad8a Compare December 19, 2024 15:50
@maximearmstrong maximearmstrong force-pushed the maxime/rework-airbyte-cloud-14 branch from b4ebced to e1831a6 Compare December 19, 2024 15:50
maximearmstrong added a commit that referenced this pull request Dec 20, 2024
…n specs loader (#26587)

## Summary & Motivation

Following [this
discussion](#26559 (comment))
for Airbyte Cloud, we add the translator as metadata as the spec loader
level

## How I Tested These Changes

Same tests with BK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants