Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/n][dagster-sling] Move logic to default fns in DagsterSlingTranslator #27140

Open
wants to merge 4 commits into
base: maxime/sling-asset-spec-1
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,45 @@ def get_asset_key_for_target(self, stream_definition) -> AssetKey:
map = {"stream1": "asset1", "stream2": "asset2"}
return AssetKey(map[stream_name])
"""
return self._default_asset_key_fn(stream_definition)

def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKey:
"""A function that takes a stream definition from a Sling replication config and returns a
Dagster AssetKey.

The stream definition is a dictionary key/value pair where the key is the stream name and
the value is a dictionary representing the Sling Replication Stream Config.

For example:

.. code-block:: python

stream_definition = {"public.users":
{'sql': 'select all_user_id, name from public."all_Users"',
'object': 'public.all_users'}
}

By default, this returns the class's target_prefix paramater concatenated with the stream name.
A stream named "public.accounts" will create an AssetKey named "target_public_accounts".

Override this function to customize how to map a Sling stream to a Dagster AssetKey.

Alternatively, you can provide metadata in your Sling replication config to specify the
Dagster AssetKey for a stream as follows:

.. code-block:: yaml

public.users:
meta:
dagster:
asset_key: "mydb_users"

Args:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition

Returns:
AssetKey: The Dagster AssetKey for the replication stream.
"""
config = stream_definition.get("config", {}) or {}
object_key = config.get("object")
meta = config.get("meta", {})
Expand Down Expand Up @@ -155,6 +194,32 @@ def get_deps_asset_key(self, stream_name: str) -> AssetKey:
return AssetKey(map[stream_name])

"""
return self._default_deps_fn(stream_definition)

def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter type in the function signature is incorrect. The method accepts stream_definition: Mapping[str, Any] but the docstring still references stream_name: str. This should be updated to match the actual implementation and maintain consistency with the other translator methods.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

"""A function that takes a stream name from a Sling replication config and returns a
Dagster AssetKey for the dependencies of the replication stream.

By default, this returns the stream name. For example, a stream named "public.accounts"
will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts".

Override this function to customize how to map a Sling stream to a Dagster depenency.
Alternatively, you can provide metadata in your Sling replication config to specify the
Dagster AssetKey for a stream as follows:

.. code-block:: yaml

public.users:
meta:
dagster:
deps: "sourcedb_users"

Args:
stream_name (str): The name of the stream.

Returns:
AssetKey: The Dagster AssetKey dependency for the replication stream.
"""
config = stream_definition.get("config", {}) or {}
meta = config.get("meta", {})
deps = meta.get("dagster", {}).get("deps")
Expand Down Expand Up @@ -184,6 +249,22 @@ def get_description(self, stream_definition: Mapping[str, Any]) -> Optional[str]
for an "sql" key in the configuration and returns its value if found. If not, it looks
for a description in the metadata under the "dagster" key.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Optional[str]: The description of the stream if found, otherwise None.
"""
return self._default_description_fn(stream_definition)

def _default_description_fn(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
"""Retrieves the description for a given stream definition.

This method checks the provided stream definition for a description. It first looks
for an "sql" key in the configuration and returns its value if found. If not, it looks
for a description in the metadata under the "dagster" key.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand All @@ -205,6 +286,21 @@ def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any
This method extracts the configuration from the provided stream definition and returns
it as a JSON metadata value.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Mapping[str, Any]: A dictionary containing the stream configuration as JSON metadata.
"""
return self._default_metadata_fn(stream_definition)

def _default_metadata_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
"""Retrieves the metadata for a given stream definition.

This method extracts the configuration from the provided stream definition and returns
it as a JSON metadata value.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand All @@ -221,6 +317,21 @@ def get_tags(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
This method returns an empty dictionary, indicating that no tags are associated with
the stream definition by default. This method can be overridden to provide custom tags.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Mapping[str, Any]: An empty dictionary.
"""
return self._default_tags_fn(stream_definition)

def _default_tags_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]:
"""Retrieves the tags for a given stream definition.

This method returns an empty dictionary, indicating that no tags are associated with
the stream definition by default. This method can be overridden to provide custom tags.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand All @@ -236,6 +347,20 @@ def get_kinds(self, stream_definition: Mapping[str, Any]) -> set[str]:

This method returns "sling" by default. This method can be overridden to provide custom kinds.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Set[str]: A set containing kinds for the stream's assets.
"""
return self._default_kinds_fn(stream_definition)

def _default_kinds_fn(self, stream_definition: Mapping[str, Any]) -> set[str]:
"""Retrieves the kinds for a given stream definition.

This method returns "sling" by default. This method can be overridden to provide custom kinds.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand All @@ -252,6 +377,21 @@ def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
This method checks the provided stream definition for a group name in the metadata
under the "dagster" key.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Optional[str]: The group name if found, otherwise None.
"""
return self._default_group_name_fn(stream_definition)

def _default_group_name_fn(self, stream_definition: Mapping[str, Any]) -> Optional[str]:
"""Retrieves the group name for a given stream definition.

This method checks the provided stream definition for a group name in the metadata
under the "dagster" key.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand All @@ -274,6 +414,26 @@ def get_freshness_policy(
returns a FreshnessPolicy object based on the provided parameters. Otherwise,
it returns None.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Optional[FreshnessPolicy]: A FreshnessPolicy object if the configuration is found,
otherwise None.
"""
return self._default_freshness_policy_fn(stream_definition)

def _default_freshness_policy_fn(
self, stream_definition: Mapping[str, Any]
) -> Optional[FreshnessPolicy]:
"""Retrieves the freshness policy for a given stream definition.

This method checks the provided stream definition for a specific configuration
indicating a freshness policy. If the configuration is found, it constructs and
returns a FreshnessPolicy object based on the provided parameters. Otherwise,
it returns None.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand Down Expand Up @@ -302,6 +462,25 @@ def get_auto_materialize_policy(
indicating an auto-materialize policy. If the configuration is found, it returns
an eager auto-materialize policy. Otherwise, it returns None.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.

Returns:
Optional[AutoMaterializePolicy]: An eager auto-materialize policy if the configuration
is found, otherwise None.
"""
return self._default_auto_materialize_policy_fn(stream_definition)

def _default_auto_materialize_policy_fn(
self, stream_definition: Mapping[str, Any]
) -> Optional[AutoMaterializePolicy]:
"""Defines the auto-materialize policy for a given stream definition.

This method checks the provided stream definition for a specific configuration
indicating an auto-materialize policy. If the configuration is found, it returns
an eager auto-materialize policy. Otherwise, it returns None.

Parameters:
stream_definition (Mapping[str, Any]): A dictionary representing the stream definition,
which includes configuration details.
Expand Down