Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix filter_to_subgraph when passing the only argument #3197

Merged
merged 2 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/step_update/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ def update_steps(
excludes=[],
downstream=False,
only=True,
exact_match=True,
)

message = "The following steps will be updated:"
Expand Down
16 changes: 13 additions & 3 deletions etl/steps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,18 @@ def to_dependency_order(
excludes: List[str],
downstream: bool = False,
only: bool = False,
exact_match: bool = False,
) -> List[str]:
"""
Organize the steps in dependency order with a topological sort. In other words,
the resulting list of steps is a valid ordering of steps such that no step is run
before the steps it depends on. Note: this ordering is not necessarily unique.
"""
subgraph = filter_to_subgraph(dag, includes, downstream=downstream, only=only) if includes else dag
subgraph = (
filter_to_subgraph(dag, includes, downstream=downstream, only=only, exact_match=exact_match)
if includes
else dag
)
in_order = list(graphlib.TopologicalSorter(subgraph).static_order())

# filter out explicit excludes
Expand All @@ -91,7 +96,9 @@ def to_dependency_order(
return filtered


def filter_to_subgraph(graph: Graph, includes: Iterable[str], downstream: bool = False, only: bool = False) -> Graph:
def filter_to_subgraph(
graph: Graph, includes: Iterable[str], downstream: bool = False, only: bool = False, exact_match: bool = False
) -> Graph:
"""
Filter the full graph to only the included nodes, and all their dependencies.

Expand All @@ -102,7 +109,10 @@ def filter_to_subgraph(graph: Graph, includes: Iterable[str], downstream: bool =
dependent on B).
"""
all_steps = graph_nodes(graph)
included = {s for s in all_steps if any(re.findall(pattern, s) for pattern in includes)}
if exact_match:
included = set(includes) & all_steps
else:
included = {s for s in all_steps if any(re.findall(pattern, s) for pattern in includes)}

if only:
# Only include explicitly selected nodes
Expand Down
Loading