Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backfill daemon run retries 3/n] retries of runs in completed backfills should not be considered part of the backfill #25900

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
TAGS_TO_OMIT_ON_RETRY,
TAGS_TO_MAYBE_OMIT_ON_RETRY,
WILL_RETRY_TAG,
)
from dagster._serdes import ConfigurableClass
Expand Down Expand Up @@ -1633,6 +1635,7 @@ def create_reexecuted_run(
run_config: Optional[Mapping[str, Any]] = None,
use_parent_run_tags: bool = False,
) -> DagsterRun:
from dagster._core.execution.backfill import BulkActionStatus
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.remote_representation import CodeLocation, RemoteJob
Expand All @@ -1650,15 +1653,27 @@ def create_reexecuted_run(
parent_run_id = parent_run.run_id

# these can differ from remote_job.tags if tags were added at launch time
parent_run_tags = (
{key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY}
if use_parent_run_tags
else {}
)
parent_run_tags_to_include = {}
if use_parent_run_tags:
parent_run_tags_to_include = {
key: val
for key, val in parent_run.tags.items()
if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY
}
# condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG,
# ROOT_BACKFILL_ID_TAG on retried run
if parent_run.tags.get(BACKFILL_ID_TAG) is not None:
# if the run was part of a backfill and the backfill is complete, we do not want the
# retry to be considered part of the backfill, so remove all backfill-related tags
backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG])
if backfill and backfill.status == BulkActionStatus.REQUESTED:
for tag in BACKFILL_TAGS:
if parent_run.tags.get(tag) is not None:
parent_run_tags_to_include[tag] = parent_run.tags[tag]

tags = merge_dicts(
remote_job.tags,
parent_run_tags,
parent_run_tags_to_include,
extra_tags or {},
{
PARENT_RUN_ID_TAG: parent_run_id,
Expand Down
5 changes: 4 additions & 1 deletion python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@
RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval"
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"

BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG}

TAGS_TO_OMIT_ON_RETRY = {

TAGS_TO_MAYBE_OMIT_ON_RETRY = {
*RUN_METRIC_TAGS,
RUN_FAILURE_REASON_TAG,
WILL_RETRY_TAG,
AUTO_RETRY_RUN_ID_TAG,
*BACKFILL_TAGS,
}


Expand Down
17 changes: 12 additions & 5 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex
yield workspace_context


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
@pytest.fixture(name="code_location", scope="module")
def code_location_fixture(
workspace_context: WorkspaceProcessContext,
) -> Iterator[RemoteRepository]:
yield cast(
) -> CodeLocation:
return cast(
CodeLocation,
next(
iter(workspace_context.create_request_context().get_code_location_entries().values())
).code_location,
).get_repository("the_repo")
)


@pytest.fixture(name="remote_repo", scope="module")
def remote_repo_fixture(
code_location: CodeLocation,
) -> Iterator[RemoteRepository]:
yield code_location.get_repository("the_repo")


def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin:
Expand Down
86 changes: 86 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig
from dagster._core.definitions.selector import (
JobSubsetSelector,
PartitionRangeSelector,
PartitionsByAssetSelector,
PartitionsSelector,
Expand All @@ -53,11 +54,13 @@
get_asset_backfill_run_chunk_size,
)
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.remote_representation import (
InProcessCodeLocationOrigin,
RemoteRepository,
RemoteRepositoryOrigin,
)
from dagster._core.remote_representation.code_location import CodeLocation
from dagster._core.storage.compute_log_manager import ComputeIOType
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
Expand All @@ -69,6 +72,7 @@
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
BACKFILL_ID_TAG,
BACKFILL_TAGS,
MAX_RETRIES_TAG,
PARTITION_NAME_TAG,
)
Expand Down Expand Up @@ -3164,3 +3168,85 @@ def test_asset_backfill_retries_make_downstreams_runnable(
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
== 0
)


def test_run_retry_not_part_of_completed_backfill(
instance: DagsterInstance,
workspace_context: WorkspaceProcessContext,
code_location: CodeLocation,
remote_repo: RemoteRepository,
):
backfill_id = "run_retries_backfill"
partition_keys = static_partitions.get_partition_keys()
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
instance.add_backfill(
PartitionBackfill.from_asset_partitions(
asset_graph=workspace_context.create_request_context().asset_graph,
backfill_id=backfill_id,
tags={"custom_tag_key": "custom_tag_value"},
backfill_timestamp=get_current_timestamp(),
asset_selection=asset_selection,
partition_names=partition_keys,
dynamic_partitions_store=instance,
all_partitions=False,
title=None,
description=None,
)
)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
assert instance.get_runs_count() == 3
wait_for_all_runs_to_start(instance, timeout=30)
assert instance.get_runs_count() == 3
wait_for_all_runs_to_finish(instance, timeout=30)

assert instance.get_runs_count() == 3
runs = reversed(list(instance.get_runs()))
for run in runs:
assert run.tags[BACKFILL_ID_TAG] == backfill_id
assert run.tags["custom_tag_key"] == "custom_tag_value"
assert step_succeeded(instance, run, "foo")
assert step_succeeded(instance, run, "reusable")
assert step_succeeded(instance, run, "bar")

list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# simulate a retry of a run
run_to_retry = instance.get_runs()[0]
selector = JobSubsetSelector(
location_name=code_location.name,
repository_name=remote_repo.name,
job_name=run_to_retry.job_name,
asset_selection=run_to_retry.asset_selection,
op_selection=None,
)
remote_job = code_location.get_job(selector)
retried_run = instance.create_reexecuted_run(
parent_run=run_to_retry,
code_location=code_location,
remote_job=remote_job,
strategy=ReexecutionStrategy.ALL_STEPS,
run_config=run_to_retry.run_config,
use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested
)

for tag in BACKFILL_TAGS:
assert tag not in retried_run.tags.keys()

# Since the backfill is alerady complete, it should not be processed by the backfill daemon and
# should remain in a completed state
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

assert retried_run.run_id not in [
r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id))
]