diff --git a/python_modules/libraries/dagster-components/dagster_components/core/component.py b/python_modules/libraries/dagster-components/dagster_components/core/component.py index ae3a0ff11f7d2..622110df9488e 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/component.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/component.py @@ -10,9 +10,8 @@ from dataclasses import dataclass from pathlib import Path from types import ModuleType -from typing import Any, ClassVar, Optional, TypedDict, TypeVar, Union +from typing import Any, Callable, ClassVar, Optional, TypedDict, TypeVar, Union -import click from dagster import _check as check from dagster._core.definitions.definitions_class import Definitions from dagster._core.errors import DagsterError @@ -102,16 +101,6 @@ def _clean_docstring(docstring: str) -> str: return f"{first_line}\n{rest}" -def _get_click_cli_help(command: click.Command) -> str: - with click.Context(command) as ctx: - formatter = click.formatting.HelpFormatter() - param_records = [ - p.get_help_record(ctx) for p in command.get_params(ctx) if p.name != "help" - ] - formatter.write_dl([pr for pr in param_records if pr]) - return formatter.getvalue() - - class ComponentTypeInternalMetadata(TypedDict): summary: Optional[str] description: Optional[str] @@ -238,9 +227,12 @@ def for_test( @property def path(self) -> Path: - from dagster_components.core.component_decl_builder import YamlComponentDecl + from dagster_components.core.component_decl_builder import ( + PythonComponentDecl, + YamlComponentDecl, + ) - if not isinstance(self.decl_node, YamlComponentDecl): + if not isinstance(self.decl_node, (YamlComponentDecl, PythonComponentDecl)): check.failed(f"Unsupported decl_node type {type(self.decl_node)}") return self.decl_node.path @@ -256,6 +248,7 @@ def for_decl_node(self, decl_node: ComponentDeclNode) -> "ComponentLoadContext": COMPONENT_REGISTRY_KEY_ATTR = "__dagster_component_registry_key" +COMPONENT_LOADER_FN_ATTR = "__dagster_component_fn" def component_type(cls: Optional[type[Component]] = None, *, name: Optional[str] = None) -> Any: @@ -299,3 +292,17 @@ def get_component_type_name(component_type: type[Component]) -> str: "Expected a registered component. Use @component to register a component.", ) return getattr(component_type, COMPONENT_REGISTRY_KEY_ATTR) + + +T_Component = TypeVar("T_Component", bound=Component) + + +def component_loader( + fn: Callable[[ComponentLoadContext], T], +) -> Callable[[ComponentLoadContext], T]: + setattr(fn, COMPONENT_LOADER_FN_ATTR, True) + return fn + + +def is_component_loader(obj: Any) -> bool: + return getattr(obj, COMPONENT_LOADER_FN_ATTR, False) diff --git a/python_modules/libraries/dagster-components/dagster_components/core/component_decl_builder.py b/python_modules/libraries/dagster-components/dagster_components/core/component_decl_builder.py index 0cef2ad9bb323..f0b7938428f63 100644 --- a/python_modules/libraries/dagster-components/dagster_components/core/component_decl_builder.py +++ b/python_modules/libraries/dagster-components/dagster_components/core/component_decl_builder.py @@ -3,6 +3,7 @@ from pathlib import Path from typing import Any, Optional, TypeVar, Union +from dagster._core.errors import DagsterInvalidDefinitionError from dagster._record import record from dagster._utils import pushd from dagster._utils.pydantic_yaml import ( @@ -19,6 +20,7 @@ ComponentLoadContext, ComponentTypeRegistry, get_component_type_name, + is_component_loader, is_registered_component_type, ) from dagster_components.utils import load_module_from_path @@ -32,6 +34,40 @@ class ComponentFileModel(BaseModel): T = TypeVar("T", bound=BaseModel) +@record +class PythonComponentDecl(ComponentDeclNode): + path: Path + + @staticmethod + def component_file_path(path: Path) -> Path: + return path / "component.py" + + @staticmethod + def exists_at(path: Path) -> bool: + return PythonComponentDecl.component_file_path(path).exists() + + @staticmethod + def from_path(path: Path) -> "PythonComponentDecl": + return PythonComponentDecl(path=path) + + def load(self, context: ComponentLoadContext) -> Sequence[Component]: + module = load_module_from_path( + self.path.stem, PythonComponentDecl.component_file_path(self.path) + ) + component_loaders = list(inspect.getmembers(module, is_component_loader)) + if len(component_loaders) < 1: + raise DagsterInvalidDefinitionError("No component loaders found in module") + elif len(component_loaders) > 1: + # note: we could support multiple component loaders in the same file, just + # being more restrictive to start + raise DagsterInvalidDefinitionError( + f"Multiple component loaders found in module: {component_loaders}" + ) + else: + _, component_loader = component_loaders[0] + return [component_loader(context)] + + @record class YamlComponentDecl(ComponentDeclNode): path: Path @@ -114,7 +150,7 @@ def load(self, context: ComponentLoadContext) -> Sequence[Component]: @record class ComponentFolder(ComponentDeclNode): path: Path - sub_decls: Sequence[Union[YamlComponentDecl, "ComponentFolder"]] + sub_decls: Sequence[Union[YamlComponentDecl, PythonComponentDecl, "ComponentFolder"]] def load(self, context: ComponentLoadContext) -> Sequence[Component]: components = [] @@ -134,6 +170,8 @@ def path_to_decl_node(path: Path) -> Optional[ComponentDeclNode]: if YamlComponentDecl.exists_at(path): return YamlComponentDecl.from_path(path) + elif PythonComponentDecl.exists_at(path): + return PythonComponentDecl.from_path(path) subs = [] for subpath in path.iterdir(): diff --git a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py index cdf4cf3439206..4a3d7219e991a 100644 --- a/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components/lib/pipes_subprocess_script_collection.py @@ -66,14 +66,13 @@ def build_defs(self, context: "ComponentLoadContext") -> "Definitions": return Definitions( assets=[self._create_asset_def(path, specs) for path, specs in self.path_specs.items()], - resources={"pipes_client": PipesSubprocessClient()}, ) def _create_asset_def(self, path: Path, specs: Sequence[AssetSpec]) -> AssetsDefinition: # TODO: allow name paraeterization @multi_asset(specs=specs, name=f"script_{path.stem}") - def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient): + def _asset(context: AssetExecutionContext): cmd = [shutil.which("python"), path] - return pipes_client.run(command=cmd, context=context).get_results() + return PipesSubprocessClient().run(command=cmd, context=context).get_results() return _asset diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/component.py b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/component.py new file mode 100644 index 0000000000000..0eed80fe1343f --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/component.py @@ -0,0 +1,25 @@ +from dagster_components import AssetAttributesModel, ComponentLoadContext +from dagster_components.core.component import component_loader +from dagster_components.lib import PipesSubprocessScriptCollection +from dagster_components.lib.pipes_subprocess_script_collection import ( + PipesSubprocessScriptCollectionParams, + PipesSubprocessScriptParams, +) + + +@component_loader +def load(context: ComponentLoadContext) -> PipesSubprocessScriptCollection: + params = PipesSubprocessScriptCollectionParams( + scripts=[ + PipesSubprocessScriptParams( + path="cool_script.py", + assets=[ + AssetAttributesModel( + key="cool_script", + automation_condition="{{ automation_condition.eager() }}", + ), + ], + ), + ] + ) + return PipesSubprocessScriptCollection.load(params=params, context=context) diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/cool_script.py b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/cool_script.py new file mode 100644 index 0000000000000..7cd95eb3907aa --- /dev/null +++ b/python_modules/libraries/dagster-components/dagster_components_tests/code_locations/python_script_location/components/script_python_decl/cool_script.py @@ -0,0 +1,6 @@ +def do_thing() -> None: + pass + + +if __name__ == "__main__": + do_thing() diff --git a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py index c6099d18fdf41..d4ae61a06e9e9 100644 --- a/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py +++ b/python_modules/libraries/dagster-components/dagster_components_tests/unit_tests/test_pipes_subprocess_script_collection.py @@ -1,8 +1,7 @@ from pathlib import Path from dagster import AssetKey -from dagster_components import AssetAttributesModel -from dagster_components.core.component_decl_builder import ComponentFileModel, YamlComponentDecl +from dagster_components.core.component_decl_builder import PythonComponentDecl from dagster_components.core.component_defs_builder import ( build_components_from_component_folder, build_defs_from_component_path, @@ -10,8 +9,6 @@ ) from dagster_components.lib.pipes_subprocess_script_collection import ( PipesSubprocessScriptCollection, - PipesSubprocessScriptCollectionParams, - PipesSubprocessScriptParams, ) from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context @@ -27,52 +24,24 @@ def test_python_native() -> None: def test_python_params() -> None: - params = PipesSubprocessScriptCollectionParams( - scripts=[ - PipesSubprocessScriptParams( - path="script_one.py", - assets=[ - AssetAttributesModel( - key="a", automation_condition="{{ automation_condition.eager() }}" - ), - AssetAttributesModel( - key="b", - automation_condition="{{ automation_condition.on_cron('@daily') }}", - deps=["up1", "up2"], - ), - ], - ), - PipesSubprocessScriptParams( - path="subdir/script_three.py", - assets=[AssetAttributesModel(key="key_override")], - ), - ] - ) - component = PipesSubprocessScriptCollection.load( - params=params, - # TODO: we should use a PythonComponentDecl here instead - context=script_load_context( - YamlComponentDecl( - path=Path(LOCATION_PATH / "components" / "scripts"), - component_file_model=ComponentFileModel(type="."), - ) - ), - ) - assert get_asset_keys(component) == { - AssetKey("a"), - AssetKey("b"), - AssetKey("up1"), - AssetKey("up2"), - AssetKey("key_override"), - } + node = PythonComponentDecl(path=Path(LOCATION_PATH / "components" / "script_python_decl")) + context = script_load_context(node) + components = node.load(context) + assert len(components) == 1 + component = components[0] + + assert get_asset_keys(component) == {AssetKey("cool_script")} def test_load_from_path() -> None: components = build_components_from_component_folder( script_load_context(), LOCATION_PATH / "components" ) - assert len(components) == 1 + assert len(components) == 2 assert get_asset_keys(components[0]) == { + AssetKey("cool_script"), + } + assert get_asset_keys(components[1]) == { AssetKey("a"), AssetKey("b"), AssetKey("c"), @@ -81,8 +50,6 @@ def test_load_from_path() -> None: AssetKey("override_key"), } - assert_assets(components[0], 6) - defs = defs_from_components( context=script_load_context(), components=components, @@ -96,6 +63,7 @@ def test_load_from_path() -> None: AssetKey("up1"), AssetKey("up2"), AssetKey("override_key"), + AssetKey("cool_script"), }