Skip to content

Commit

Permalink
[components][rfc] @component_loader decorator
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Jan 23, 2025
1 parent bc45aea commit f3750af
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@
from dataclasses import dataclass
from pathlib import Path
from types import ModuleType
from typing import Any, ClassVar, Optional, TypedDict, TypeVar, Union
from typing import Any, Callable, ClassVar, Optional, TypedDict, TypeVar, Union

import click
from dagster import _check as check
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.errors import DagsterError
Expand Down Expand Up @@ -102,16 +101,6 @@ def _clean_docstring(docstring: str) -> str:
return f"{first_line}\n{rest}"


def _get_click_cli_help(command: click.Command) -> str:
with click.Context(command) as ctx:
formatter = click.formatting.HelpFormatter()
param_records = [
p.get_help_record(ctx) for p in command.get_params(ctx) if p.name != "help"
]
formatter.write_dl([pr for pr in param_records if pr])
return formatter.getvalue()


class ComponentTypeInternalMetadata(TypedDict):
summary: Optional[str]
description: Optional[str]
Expand Down Expand Up @@ -238,9 +227,12 @@ def for_test(

@property
def path(self) -> Path:
from dagster_components.core.component_decl_builder import YamlComponentDecl
from dagster_components.core.component_decl_builder import (
PythonComponentDecl,
YamlComponentDecl,
)

if not isinstance(self.decl_node, YamlComponentDecl):
if not isinstance(self.decl_node, (YamlComponentDecl, PythonComponentDecl)):
check.failed(f"Unsupported decl_node type {type(self.decl_node)}")

return self.decl_node.path
Expand All @@ -256,6 +248,7 @@ def for_decl_node(self, decl_node: ComponentDeclNode) -> "ComponentLoadContext":


COMPONENT_REGISTRY_KEY_ATTR = "__dagster_component_registry_key"
COMPONENT_LOADER_FN_ATTR = "__dagster_component_loader_fn"


def component_type(cls: Optional[type[Component]] = None, *, name: Optional[str] = None) -> Any:
Expand Down Expand Up @@ -299,3 +292,17 @@ def get_component_type_name(component_type: type[Component]) -> str:
"Expected a registered component. Use @component to register a component.",
)
return getattr(component_type, COMPONENT_REGISTRY_KEY_ATTR)


T_Component = TypeVar("T_Component", bound=Component)


def component_loader(
fn: Callable[[ComponentLoadContext], T],
) -> Callable[[ComponentLoadContext], T]:
setattr(fn, COMPONENT_LOADER_FN_ATTR, True)
return fn


def is_component_loader(obj: Any) -> bool:
return getattr(obj, COMPONENT_LOADER_FN_ATTR, False)
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
from typing import Any, Optional, TypeVar, Union

from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._record import record
from dagster._utils import pushd
from dagster._utils.pydantic_yaml import (
Expand All @@ -19,6 +20,7 @@
ComponentLoadContext,
ComponentTypeRegistry,
get_component_type_name,
is_component_loader,
is_registered_component_type,
)
from dagster_components.utils import load_module_from_path
Expand All @@ -32,6 +34,40 @@ class ComponentFileModel(BaseModel):
T = TypeVar("T", bound=BaseModel)


@record
class PythonComponentDecl(ComponentDeclNode):
path: Path

@staticmethod
def component_file_path(path: Path) -> Path:
return path / "component.py"

@staticmethod
def exists_at(path: Path) -> bool:
return PythonComponentDecl.component_file_path(path).exists()

@staticmethod
def from_path(path: Path) -> "PythonComponentDecl":
return PythonComponentDecl(path=path)

def load(self, context: ComponentLoadContext) -> Sequence[Component]:
module = load_module_from_path(
self.path.stem, PythonComponentDecl.component_file_path(self.path)
)
component_loaders = list(inspect.getmembers(module, is_component_loader))
if len(component_loaders) < 1:
raise DagsterInvalidDefinitionError("No component loaders found in module")
elif len(component_loaders) > 1:
# note: we could support multiple component loaders in the same file, just
# being more restrictive to start
raise DagsterInvalidDefinitionError(
f"Multiple component loaders found in module: {component_loaders}"
)
else:
_, component_loader = component_loaders[0]
return [component_loader(context)]


@record
class YamlComponentDecl(ComponentDeclNode):
path: Path
Expand Down Expand Up @@ -114,7 +150,7 @@ def load(self, context: ComponentLoadContext) -> Sequence[Component]:
@record
class ComponentFolder(ComponentDeclNode):
path: Path
sub_decls: Sequence[Union[YamlComponentDecl, "ComponentFolder"]]
sub_decls: Sequence[Union[YamlComponentDecl, PythonComponentDecl, "ComponentFolder"]]

def load(self, context: ComponentLoadContext) -> Sequence[Component]:
components = []
Expand All @@ -134,6 +170,8 @@ def path_to_decl_node(path: Path) -> Optional[ComponentDeclNode]:

if YamlComponentDecl.exists_at(path):
return YamlComponentDecl.from_path(path)
elif PythonComponentDecl.exists_at(path):
return PythonComponentDecl.from_path(path)

subs = []
for subpath in path.iterdir():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ def build_defs(self, context: "ComponentLoadContext") -> "Definitions":

return Definitions(
assets=[self._create_asset_def(path, specs) for path, specs in self.path_specs.items()],
resources={"pipes_client": PipesSubprocessClient()},
)

def _create_asset_def(self, path: Path, specs: Sequence[AssetSpec]) -> AssetsDefinition:
# TODO: allow name paraeterization
@multi_asset(specs=specs, name=f"script_{path.stem}")
def _asset(context: AssetExecutionContext, pipes_client: PipesSubprocessClient):
def _asset(context: AssetExecutionContext):
cmd = [shutil.which("python"), path]
return pipes_client.run(command=cmd, context=context).get_results()
return PipesSubprocessClient().run(command=cmd, context=context).get_results()

return _asset
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster_components import AssetAttributesModel, ComponentLoadContext
from dagster_components.core.component import component_loader
from dagster_components.lib import PipesSubprocessScriptCollection
from dagster_components.lib.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollectionParams,
PipesSubprocessScriptParams,
)


@component_loader
def load(context: ComponentLoadContext) -> PipesSubprocessScriptCollection:
params = PipesSubprocessScriptCollectionParams(
scripts=[
PipesSubprocessScriptParams(
path="cool_script.py",
assets=[
AssetAttributesModel(
key="cool_script",
automation_condition="{{ automation_condition.eager() }}",
),
],
),
]
)
return PipesSubprocessScriptCollection.load(params=params, context=context)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
def do_thing() -> None:
pass


if __name__ == "__main__":
do_thing()
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from pathlib import Path

from dagster import AssetKey
from dagster_components import AssetAttributesModel
from dagster_components.core.component_decl_builder import ComponentFileModel, YamlComponentDecl
from dagster_components.core.component_decl_builder import PythonComponentDecl
from dagster_components.core.component_defs_builder import (
build_components_from_component_folder,
build_defs_from_component_path,
defs_from_components,
)
from dagster_components.lib.pipes_subprocess_script_collection import (
PipesSubprocessScriptCollection,
PipesSubprocessScriptCollectionParams,
PipesSubprocessScriptParams,
)

from dagster_components_tests.utils import assert_assets, get_asset_keys, script_load_context
Expand All @@ -27,52 +24,24 @@ def test_python_native() -> None:


def test_python_params() -> None:
params = PipesSubprocessScriptCollectionParams(
scripts=[
PipesSubprocessScriptParams(
path="script_one.py",
assets=[
AssetAttributesModel(
key="a", automation_condition="{{ automation_condition.eager() }}"
),
AssetAttributesModel(
key="b",
automation_condition="{{ automation_condition.on_cron('@daily') }}",
deps=["up1", "up2"],
),
],
),
PipesSubprocessScriptParams(
path="subdir/script_three.py",
assets=[AssetAttributesModel(key="key_override")],
),
]
)
component = PipesSubprocessScriptCollection.load(
params=params,
# TODO: we should use a PythonComponentDecl here instead
context=script_load_context(
YamlComponentDecl(
path=Path(LOCATION_PATH / "components" / "scripts"),
component_file_model=ComponentFileModel(type="."),
)
),
)
assert get_asset_keys(component) == {
AssetKey("a"),
AssetKey("b"),
AssetKey("up1"),
AssetKey("up2"),
AssetKey("key_override"),
}
node = PythonComponentDecl(path=Path(LOCATION_PATH / "components" / "script_python_decl"))
context = script_load_context(node)
components = node.load(context)
assert len(components) == 1
component = components[0]

assert get_asset_keys(component) == {AssetKey("cool_script")}


def test_load_from_path() -> None:
components = build_components_from_component_folder(
script_load_context(), LOCATION_PATH / "components"
)
assert len(components) == 1
assert len(components) == 2
assert get_asset_keys(components[0]) == {
AssetKey("cool_script"),
}
assert get_asset_keys(components[1]) == {
AssetKey("a"),
AssetKey("b"),
AssetKey("c"),
Expand All @@ -81,8 +50,6 @@ def test_load_from_path() -> None:
AssetKey("override_key"),
}

assert_assets(components[0], 6)

defs = defs_from_components(
context=script_load_context(),
components=components,
Expand All @@ -96,6 +63,7 @@ def test_load_from_path() -> None:
AssetKey("up1"),
AssetKey("up2"),
AssetKey("override_key"),
AssetKey("cool_script"),
}


Expand Down

0 comments on commit f3750af

Please sign in to comment.