Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Parameteraize code location name in dg projects #26598

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions python_modules/libraries/dagster-dg/dagster_dg/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,24 +113,53 @@ def get_code_location_names(self) -> Iterable[str]:
return [loc.name for loc in sorted((self.root_path / "code_locations").iterdir())]


def get_code_location_env_hash(code_location_root_path: Path) -> str:
uv_lock_path = code_location_root_path / "uv.lock"
if not uv_lock_path.exists():
raise DgError(f"uv.lock file not found in {code_location_root_path}")
local_components_path = (
code_location_root_path / code_location_root_path.name / _CODE_LOCATION_COMPONENTS_LIB_DIR
)
if not local_components_path.exists():
raise DgError(f"Local components directory not found in {code_location_root_path}")
@dataclass
class DgCodeLocationMetadataInfo:
name: str
path: Path
local_components_path: Path
uv_lock_path: Path

@classmethod
def from_path(cls, path: Path) -> "DgCodeLocationMetadataInfo":
pyproject_yaml_path = path / "pyproject.toml"

if not pyproject_yaml_path.exists():
raise DgError(f"pyproject.toml not found in {path}")

uv_lock_path = path / "uv.lock"
if not uv_lock_path.exists():
raise DgError(f"uv.lock file not found in {path}")

with open(pyproject_yaml_path) as f:
toml = tomli.loads(f.read())

dagster_block = toml.get("tool", {}).get("dagster")
name = dagster_block.get("name", path.name)
local_components_path = path / name / _CODE_LOCATION_COMPONENTS_LIB_DIR
if not local_components_path.exists():
raise DgError(f"Local components directory not found in {path}")

return cls(
path=path,
name=name,
uv_lock_path=uv_lock_path,
local_components_path=local_components_path,
)


def get_code_location_env_hash(metadata: DgCodeLocationMetadataInfo) -> str:
hasher = hashlib.md5()
hash_file_metadata(hasher, uv_lock_path)
hash_directory_metadata(hasher, local_components_path)
hash_file_metadata(hasher, metadata.uv_lock_path)
hash_directory_metadata(hasher, metadata.local_components_path)
return hasher.hexdigest()


def make_cache_key(code_location_path: Path, data_type: CachableDataType) -> Tuple[str, str, str]:
path_parts = [str(part) for part in code_location_path.parts if part != "/"]
env_hash = get_code_location_env_hash(code_location_path)
def make_cache_key(
metadata: DgCodeLocationMetadataInfo, data_type: CachableDataType
) -> Tuple[str, str, str]:
path_parts = [str(part) for part in metadata.path.parts if part != "/"]
env_hash = get_code_location_env_hash(metadata)
return ("_".join(path_parts), env_hash, data_type)


Expand All @@ -157,9 +186,11 @@ class CodeLocationDirectoryContext:
def from_path(cls, path: Path, dg_context: DgContext) -> Self:
root_path = _resolve_code_location_root_directory(path)

metadata = DgCodeLocationMetadataInfo.from_path(root_path)

cache = dg_context.cache
if cache:
cache_key = make_cache_key(root_path, "component_registry_data")
cache_key = make_cache_key(metadata, "component_registry_data")

raw_registry_data = cache.get(cache_key) if cache else None
if not raw_registry_data:
Expand Down