diff --git a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py index 685c7377690e0..83af56fd3fb6c 100644 --- a/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py +++ b/python_modules/libraries/dagster-sling/dagster_sling/dagster_sling_translator.py @@ -101,6 +101,45 @@ def get_asset_key_for_target(self, stream_definition) -> AssetKey: map = {"stream1": "asset1", "stream2": "asset2"} return AssetKey(map[stream_name]) """ + return self._default_asset_key_fn(stream_definition) + + def _default_asset_key_fn(self, stream_definition: Mapping[str, Any]) -> AssetKey: + """A function that takes a stream definition from a Sling replication config and returns a + Dagster AssetKey. + + The stream definition is a dictionary key/value pair where the key is the stream name and + the value is a dictionary representing the Sling Replication Stream Config. + + For example: + + .. code-block:: python + + stream_definition = {"public.users": + {'sql': 'select all_user_id, name from public."all_Users"', + 'object': 'public.all_users'} + } + + By default, this returns the class's target_prefix paramater concatenated with the stream name. + A stream named "public.accounts" will create an AssetKey named "target_public_accounts". + + Override this function to customize how to map a Sling stream to a Dagster AssetKey. + + Alternatively, you can provide metadata in your Sling replication config to specify the + Dagster AssetKey for a stream as follows: + + .. code-block:: yaml + + public.users: + meta: + dagster: + asset_key: "mydb_users" + + Args: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition + + Returns: + AssetKey: The Dagster AssetKey for the replication stream. + """ config = stream_definition.get("config", {}) or {} object_key = config.get("object") meta = config.get("meta", {}) @@ -155,6 +194,32 @@ def get_deps_asset_key(self, stream_name: str) -> AssetKey: return AssetKey(map[stream_name]) """ + return self._default_deps_fn(stream_definition) + + def _default_deps_fn(self, stream_definition: Mapping[str, Any]) -> Iterable[AssetKey]: + """A function that takes a stream name from a Sling replication config and returns a + Dagster AssetKey for the dependencies of the replication stream. + + By default, this returns the stream name. For example, a stream named "public.accounts" + will create an AssetKey named "target_public_accounts" and a dependency named "public_accounts". + + Override this function to customize how to map a Sling stream to a Dagster depenency. + Alternatively, you can provide metadata in your Sling replication config to specify the + Dagster AssetKey for a stream as follows: + + .. code-block:: yaml + + public.users: + meta: + dagster: + deps: "sourcedb_users" + + Args: + stream_name (str): The name of the stream. + + Returns: + AssetKey: The Dagster AssetKey dependency for the replication stream. + """ config = stream_definition.get("config", {}) or {} meta = config.get("meta", {}) deps = meta.get("dagster", {}).get("deps") @@ -184,6 +249,22 @@ def get_description(self, stream_definition: Mapping[str, Any]) -> Optional[str] for an "sql" key in the configuration and returns its value if found. If not, it looks for a description in the metadata under the "dagster" key. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Optional[str]: The description of the stream if found, otherwise None. + """ + return self._default_description_fn(stream_definition) + + def _default_description_fn(self, stream_definition: Mapping[str, Any]) -> Optional[str]: + """Retrieves the description for a given stream definition. + + This method checks the provided stream definition for a description. It first looks + for an "sql" key in the configuration and returns its value if found. If not, it looks + for a description in the metadata under the "dagster" key. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -205,6 +286,21 @@ def get_metadata(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any This method extracts the configuration from the provided stream definition and returns it as a JSON metadata value. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Mapping[str, Any]: A dictionary containing the stream configuration as JSON metadata. + """ + return self._default_metadata_fn(stream_definition) + + def _default_metadata_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: + """Retrieves the metadata for a given stream definition. + + This method extracts the configuration from the provided stream definition and returns + it as a JSON metadata value. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -221,6 +317,21 @@ def get_tags(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: This method returns an empty dictionary, indicating that no tags are associated with the stream definition by default. This method can be overridden to provide custom tags. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Mapping[str, Any]: An empty dictionary. + """ + return self._default_tags_fn(stream_definition) + + def _default_tags_fn(self, stream_definition: Mapping[str, Any]) -> Mapping[str, Any]: + """Retrieves the tags for a given stream definition. + + This method returns an empty dictionary, indicating that no tags are associated with + the stream definition by default. This method can be overridden to provide custom tags. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -236,6 +347,20 @@ def get_kinds(self, stream_definition: Mapping[str, Any]) -> set[str]: This method returns "sling" by default. This method can be overridden to provide custom kinds. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Set[str]: A set containing kinds for the stream's assets. + """ + return self._default_kinds_fn(stream_definition) + + def _default_kinds_fn(self, stream_definition: Mapping[str, Any]) -> set[str]: + """Retrieves the kinds for a given stream definition. + + This method returns "sling" by default. This method can be overridden to provide custom kinds. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -252,6 +377,21 @@ def get_group_name(self, stream_definition: Mapping[str, Any]) -> Optional[str]: This method checks the provided stream definition for a group name in the metadata under the "dagster" key. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Optional[str]: The group name if found, otherwise None. + """ + return self._default_group_name_fn(stream_definition) + + def _default_group_name_fn(self, stream_definition: Mapping[str, Any]) -> Optional[str]: + """Retrieves the group name for a given stream definition. + + This method checks the provided stream definition for a group name in the metadata + under the "dagster" key. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -274,6 +414,26 @@ def get_freshness_policy( returns a FreshnessPolicy object based on the provided parameters. Otherwise, it returns None. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Optional[FreshnessPolicy]: A FreshnessPolicy object if the configuration is found, + otherwise None. + """ + return self._default_freshness_policy_fn(stream_definition) + + def _default_freshness_policy_fn( + self, stream_definition: Mapping[str, Any] + ) -> Optional[FreshnessPolicy]: + """Retrieves the freshness policy for a given stream definition. + + This method checks the provided stream definition for a specific configuration + indicating a freshness policy. If the configuration is found, it constructs and + returns a FreshnessPolicy object based on the provided parameters. Otherwise, + it returns None. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details. @@ -302,6 +462,25 @@ def get_auto_materialize_policy( indicating an auto-materialize policy. If the configuration is found, it returns an eager auto-materialize policy. Otherwise, it returns None. + Parameters: + stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, + which includes configuration details. + + Returns: + Optional[AutoMaterializePolicy]: An eager auto-materialize policy if the configuration + is found, otherwise None. + """ + return self._default_auto_materialize_policy_fn(stream_definition) + + def _default_auto_materialize_policy_fn( + self, stream_definition: Mapping[str, Any] + ) -> Optional[AutoMaterializePolicy]: + """Defines the auto-materialize policy for a given stream definition. + + This method checks the provided stream definition for a specific configuration + indicating an auto-materialize policy. If the configuration is found, it returns + an eager auto-materialize policy. Otherwise, it returns None. + Parameters: stream_definition (Mapping[str, Any]): A dictionary representing the stream definition, which includes configuration details.