Skip to content

Commit

Permalink
[components] Initial implementation of component project scaffolding (#…
Browse files Browse the repository at this point in the history
…26104)

## Summary & Motivation

This is an initial implementation of scaffolding for components. It adds
a new CLI `dg` rather than a subcommand to `dagster`. There is other
work in flight but this seemed like a good place to cut the bottom of
the stack. Implements:

- `dg generate deployment`
- `dg generate code_location`
- `dg generate component-type`
- `dg generate component-instance`

What is NOT in this PR:

- global component registry based on entrypoints-- instead this only
resolves component types local to the code location
- anything resembling final `Component` API. This uses a very bare-bones
prototype implementing `build_defs` and `generate_files` that will be
replaced.
- Calling out to `uv` in scaffolding commands.
- Thoroughly polished output for scaffold commands.
- Final names for commands (`component-type`, `component-instance` are
questionable)

All of ^^ will come upstack or from @OwenKephart efforts.

Note that it was necessary to expose `Component` from top-level
`dagster` so that realistic generated files could be used (`from dagster
import Component`). To avoid exposing this to users I added it as a
dynamic import.

Note also there is some weirdness with `sys.path`, which by default has
`""` as first entry (allowing imports from cwd) but which for some
reason is missing this entry in the test environment. I've hacked it in
with some code that will probably go away once the underyling issue can
be identified.

## How I Tested These Changes

New unit tests.
  • Loading branch information
smackesey authored Nov 26, 2024
1 parent d8c00f1 commit f73593e
Show file tree
Hide file tree
Showing 21 changed files with 621 additions and 9 deletions.
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,13 @@
}


# Use this to expose symbols from top-level dagster for testing purposes during development, before
# we want to expose them to users.
_HIDDEN: Final[Mapping[str, str]] = {
"Component": "dagster._components",
}


def __getattr__(name: str) -> TypingAny:
if name in _DEPRECATED:
module, breaking_version, additional_warn_text = _DEPRECATED[name]
Expand All @@ -687,6 +694,9 @@ def __getattr__(name: str) -> TypingAny:
stacklevel=stacklevel,
)
return value
elif name in _HIDDEN:
module = _HIDDEN[name]
return getattr(importlib.import_module(module), name)
else:
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

Expand Down
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ def group():


def main():
cli(auto_envvar_prefix=ENV_PREFIX) # pylint:disable=E1123
cli(auto_envvar_prefix=ENV_PREFIX)
204 changes: 204 additions & 0 deletions python_modules/dagster/dagster/_components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import importlib.util
import os
import sys
from abc import ABC, abstractmethod
from types import ModuleType
from typing import TYPE_CHECKING, ClassVar, Dict, Final, Iterable, Optional, Type

from typing_extensions import Self

from dagster._core.errors import DagsterError
from dagster._utils import snakecase

if TYPE_CHECKING:
from dagster._core.definitions.definitions_class import Definitions


class Component(ABC):
name: ClassVar[Optional[str]] = None

@classmethod
def registered_name(cls):
return cls.name or snakecase(cls.__name__)

@classmethod
def generate_files(cls) -> None:
raise NotImplementedError()

@abstractmethod
def build_defs(self) -> "Definitions": ...


def is_inside_deployment_project(path: str = ".") -> bool:
try:
_resolve_deployment_root_path(path)
return True
except DagsterError:
return False


def _resolve_deployment_root_path(path: str) -> str:
current_path = os.path.abspath(path)
while not _is_deployment_root(current_path):
current_path = os.path.dirname(current_path)
if current_path == "/":
raise DagsterError("Cannot find deployment root")
return current_path


def is_inside_code_location_project(path: str = ".") -> bool:
try:
_resolve_code_location_root_path(path)
return True
except DagsterError:
return False


def _resolve_code_location_root_path(path: str) -> str:
current_path = os.path.abspath(path)
while not _is_code_location_root(current_path):
current_path = os.path.dirname(current_path)
if current_path == "/":
raise DagsterError("Cannot find code location root")
return current_path


def _is_deployment_root(path: str) -> bool:
return os.path.exists(os.path.join(path, "code_locations"))


def _is_code_location_root(path: str) -> bool:
return os.path.basename(os.path.dirname(path)) == "code_locations"


# Deployment
_DEPLOYMENT_CODE_LOCATIONS_DIR: Final = "code_locations"

# Code location
_CODE_LOCATION_CUSTOM_COMPONENTS_DIR: Final = "lib"
_CODE_LOCATION_COMPONENT_INSTANCES_DIR: Final = "components"


class DeploymentProjectContext:
@classmethod
def from_path(cls, path: str) -> Self:
return cls(root_path=_resolve_deployment_root_path(path))

def __init__(self, root_path: str):
self._root_path = root_path

@property
def deployment_root(self) -> str:
return self._root_path

@property
def code_location_root_path(self) -> str:
return os.path.join(self._root_path, _DEPLOYMENT_CODE_LOCATIONS_DIR)

def has_code_location(self, name: str) -> bool:
return os.path.exists(os.path.join(self._root_path, "code_locations", name))


class CodeLocationProjectContext:
@classmethod
def from_path(cls, path: str) -> Self:
root_path = _resolve_code_location_root_path(path)
name = os.path.basename(root_path)
component_registry = ComponentRegistry()

# TODO: Rm when a more robust solution is implemented
# Make sure we can import from the cwd
if sys.path[0] != "":
sys.path.insert(0, "")

components_lib_module = f"{name}.{_CODE_LOCATION_CUSTOM_COMPONENTS_DIR}"
module = importlib.import_module(components_lib_module)
register_components_in_module(component_registry, module)

return cls(
deployment_context=DeploymentProjectContext.from_path(path),
root_path=root_path,
name=os.path.basename(root_path),
component_registry=component_registry,
)

def __init__(
self,
deployment_context: DeploymentProjectContext,
root_path: str,
name: str,
component_registry: "ComponentRegistry",
):
self._deployment_context = deployment_context
self._root_path = root_path
self._name = name
self._component_registry = component_registry

@property
def deployment_context(self) -> DeploymentProjectContext:
return self._deployment_context

@property
def component_types_root_path(self) -> str:
return os.path.join(self._root_path, self._name, _CODE_LOCATION_CUSTOM_COMPONENTS_DIR)

@property
def component_types_root_module(self) -> str:
return f"{self._name}.{_CODE_LOCATION_CUSTOM_COMPONENTS_DIR}"

def has_component_type(self, name: str) -> bool:
return self._component_registry.has(name)

def get_component_type(self, name: str) -> Type[Component]:
if not self.has_component_type(name):
raise DagsterError(f"No component type named {name}")
return self._component_registry.get(name)

@property
def component_instances_root_path(self) -> str:
return os.path.join(self._root_path, self._name, _CODE_LOCATION_COMPONENT_INSTANCES_DIR)

@property
def component_instances(self) -> Iterable[str]:
return os.listdir(
os.path.join(self._root_path, self._name, _CODE_LOCATION_COMPONENT_INSTANCES_DIR)
)

def has_component_instance(self, name: str) -> bool:
return os.path.exists(
os.path.join(self._root_path, self._name, _CODE_LOCATION_COMPONENT_INSTANCES_DIR, name)
)


class ComponentRegistry:
def __init__(self):
self._components: Dict[str, Type[Component]] = {}

def register(self, name: str, component: Type[Component]) -> None:
self._components[name] = component

def has(self, name: str) -> bool:
return name in self._components

def get(self, name: str) -> Type[Component]:
return self._components[name]

def keys(self) -> Iterable[str]:
return self._components.keys()

def __repr__(self):
return f"<ComponentRegistry {list(self._components.keys())}>"


def register_components_in_module(registry: ComponentRegistry, root_module: ModuleType) -> None:
from dagster._core.definitions.load_assets_from_modules import (
find_modules_in_package,
find_subclasses_in_module,
)

for module in find_modules_in_package(root_module):
for component in find_subclasses_in_module(module, (Component,)):
if component is Component:
continue
name = f"{module.__name__}[{component.registered_name()}]"
registry.register(name, component)
28 changes: 28 additions & 0 deletions python_modules/dagster/dagster/_components/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import click

from dagster._components.cli.generate import generate_cli
from dagster.version import __version__


def create_dagster_components_cli():
commands = {
"generate": generate_cli,
}

@click.group(
commands=commands,
context_settings={"max_content_width": 120, "help_option_names": ["-h", "--help"]},
)
@click.version_option(__version__, "--version", "-v")
def group():
"""CLI tools for working with Dagster."""

return group


ENV_PREFIX = "DG_CLI"
cli = create_dagster_components_cli()


def main():
cli(auto_envvar_prefix=ENV_PREFIX)
102 changes: 102 additions & 0 deletions python_modules/dagster/dagster/_components/cli/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os
import sys

import click

from dagster._components import (
CodeLocationProjectContext,
DeploymentProjectContext,
is_inside_code_location_project,
is_inside_deployment_project,
)
from dagster._generate.generate import (
generate_code_location,
generate_component_instance,
generate_component_type,
generate_deployment,
)


@click.group(name="generate")
def generate_cli():
"""Commands for generating Dagster components and related entities."""


@click.command(name="deployment")
@click.argument("path", type=str)
def generate_deployment_command(path: str) -> None:
"""Generate a Dagster deployment instance."""
dir_abspath = os.path.abspath(path)
if os.path.exists(dir_abspath):
click.echo(
click.style(f"A file or directory at {dir_abspath} already exists. ", fg="red")
+ "\nPlease delete the contents of this path or choose another location."
)
sys.exit(1)
generate_deployment(path)


@click.command(name="code-location")
@click.argument("name", type=str)
def generate_code_location_command(name: str) -> None:
"""Generate a Dagster code location inside a component."""
if not is_inside_deployment_project():
click.echo(
click.style("This command must be run inside a Dagster deployment project.", fg="red")
)
sys.exit(1)

context = DeploymentProjectContext.from_path(os.getcwd())
if context.has_code_location(name):
click.echo(click.style(f"A code location named {name} already exists.", fg="red"))
sys.exit(1)

code_location_path = os.path.join(context.code_location_root_path, name)
generate_code_location(code_location_path)


@click.command(name="component-type")
@click.argument("name", type=str)
def generate_component_type_command(name: str) -> None:
"""Generate a Dagster component instance."""
if not is_inside_code_location_project():
click.echo(
click.style(
"This command must be run inside a Dagster code location project.", fg="red"
)
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(os.getcwd())
if context.has_component_type(name):
click.echo(click.style(f"A component type named `{name}` already exists.", fg="red"))
sys.exit(1)

generate_component_type(context.component_types_root_path, name)


@click.command(name="component-instance")
@click.argument("component-type", type=str)
@click.argument("name", type=str)
def generate_component_instance_command(component_type: str, name: str) -> None:
"""Generate a Dagster component instance."""
if not is_inside_code_location_project():
click.echo(
click.style(
"This command must be run inside a Dagster code location project.", fg="red"
)
)
sys.exit(1)

context = CodeLocationProjectContext.from_path(os.getcwd())
if not context.has_component_type(component_type):
click.echo(
click.style(f"No component type `{component_type}` could be resolved.", fg="red")
)
sys.exit(1)
elif context.has_component_instance(name):
click.echo(click.style(f"A component instance named `{name}` already exists.", fg="red"))
sys.exit(1)

component_type_cls = context.get_component_type(component_type)
generate_component_instance(context.component_instances_root_path, name, component_type_cls)
Loading

0 comments on commit f73593e

Please sign in to comment.