Skip to content

Commit 8891639

Browse files
authored
[backfill daemon run retries 3/n] retries of runs in completed backfills should not be considered part of the backfill (#25900)
## Summary & Motivation If a run is retried after a backfill is complete, that run is given the backfill tag, but has no affect on the backfill itself. This can cause confusion. Imagine the scenario where a single asset-partition failed in a backfill. The backfill is complete and a user retries the failed asset and the retry succeeds. That retried run will show up in the list of runs for the backfill, but the status in the overview tab for partition will still be failed since the status is locked when the backfill completes. We should be more strict about when run retries are considered part of the backfill. We decided in dagster-io/internal#12460 that retries that are launched while the backfill is in progress will be part of the backfill, but that retries that are launched after the backfill is complete should not be considered part of the backfill. To make this change we need to remove the backfill tag from retried runs if the backfill is not in progress. ## How I Tested These Changes new unit tests manually launched a retry of a run that was launched by a backfill after the backfill was complete. no backfill tags were added <img width="1037" alt="Screenshot 2024-12-02 at 1 55 09 PM" src="https://github.com/user-attachments/assets/5bb8ae12-4c61-4fd4-8255-1d245ae43318"> ## Changelog Manual retries of runs launched by backfills are no longer considered part of the backfill if the backfill is complete when the retry is launched.
1 parent e8290c7 commit 8891639

File tree

4 files changed

+124
-13
lines changed

4 files changed

+124
-13
lines changed

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,13 @@
7171
from dagster._core.storage.tags import (
7272
ASSET_PARTITION_RANGE_END_TAG,
7373
ASSET_PARTITION_RANGE_START_TAG,
74+
BACKFILL_ID_TAG,
75+
BACKFILL_TAGS,
7476
PARENT_RUN_ID_TAG,
7577
PARTITION_NAME_TAG,
7678
RESUME_RETRY_TAG,
7779
ROOT_RUN_ID_TAG,
78-
TAGS_TO_OMIT_ON_RETRY,
80+
TAGS_TO_MAYBE_OMIT_ON_RETRY,
7981
WILL_RETRY_TAG,
8082
)
8183
from dagster._serdes import ConfigurableClass
@@ -1633,6 +1635,7 @@ def create_reexecuted_run(
16331635
run_config: Optional[Mapping[str, Any]] = None,
16341636
use_parent_run_tags: bool = False,
16351637
) -> DagsterRun:
1638+
from dagster._core.execution.backfill import BulkActionStatus
16361639
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
16371640
from dagster._core.execution.plan.state import KnownExecutionState
16381641
from dagster._core.remote_representation import CodeLocation, RemoteJob
@@ -1650,15 +1653,27 @@ def create_reexecuted_run(
16501653
parent_run_id = parent_run.run_id
16511654

16521655
# these can differ from remote_job.tags if tags were added at launch time
1653-
parent_run_tags = (
1654-
{key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY}
1655-
if use_parent_run_tags
1656-
else {}
1657-
)
1656+
parent_run_tags_to_include = {}
1657+
if use_parent_run_tags:
1658+
parent_run_tags_to_include = {
1659+
key: val
1660+
for key, val in parent_run.tags.items()
1661+
if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY
1662+
}
1663+
# condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG,
1664+
# ROOT_BACKFILL_ID_TAG on retried run
1665+
if parent_run.tags.get(BACKFILL_ID_TAG) is not None:
1666+
# if the run was part of a backfill and the backfill is complete, we do not want the
1667+
# retry to be considered part of the backfill, so remove all backfill-related tags
1668+
backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG])
1669+
if backfill and backfill.status == BulkActionStatus.REQUESTED:
1670+
for tag in BACKFILL_TAGS:
1671+
if parent_run.tags.get(tag) is not None:
1672+
parent_run_tags_to_include[tag] = parent_run.tags[tag]
16581673

16591674
tags = merge_dicts(
16601675
remote_job.tags,
1661-
parent_run_tags,
1676+
parent_run_tags_to_include,
16621677
extra_tags or {},
16631678
{
16641679
PARENT_RUN_ID_TAG: parent_run_id,

python_modules/dagster/dagster/_core/storage/tags.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,15 @@
108108
RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval"
109109
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"
110110

111+
BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG}
111112

112-
TAGS_TO_OMIT_ON_RETRY = {
113+
114+
TAGS_TO_MAYBE_OMIT_ON_RETRY = {
113115
*RUN_METRIC_TAGS,
114116
RUN_FAILURE_REASON_TAG,
115117
WILL_RETRY_TAG,
116118
AUTO_RETRY_RUN_ID_TAG,
119+
*BACKFILL_TAGS,
117120
}
118121

119122

python_modules/dagster/dagster_tests/daemon_tests/conftest.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex
6464
yield workspace_context
6565

6666

67-
@pytest.fixture(name="remote_repo", scope="module")
68-
def remote_repo_fixture(
67+
@pytest.fixture(name="code_location", scope="module")
68+
def code_location_fixture(
6969
workspace_context: WorkspaceProcessContext,
70-
) -> Iterator[RemoteRepository]:
71-
yield cast(
70+
) -> CodeLocation:
71+
return cast(
7272
CodeLocation,
7373
next(
7474
iter(workspace_context.create_request_context().get_code_location_entries().values())
7575
).code_location,
76-
).get_repository("the_repo")
76+
)
77+
78+
79+
@pytest.fixture(name="remote_repo", scope="module")
80+
def remote_repo_fixture(
81+
code_location: CodeLocation,
82+
) -> Iterator[RemoteRepository]:
83+
yield code_location.get_repository("the_repo")
7784

7885

7986
def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin:

python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from dagster._core.definitions.events import AssetKeyPartitionKey
4242
from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig
4343
from dagster._core.definitions.selector import (
44+
JobSubsetSelector,
4445
PartitionRangeSelector,
4546
PartitionsByAssetSelector,
4647
PartitionsSelector,
@@ -53,11 +54,13 @@
5354
get_asset_backfill_run_chunk_size,
5455
)
5556
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
57+
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
5658
from dagster._core.remote_representation import (
5759
InProcessCodeLocationOrigin,
5860
RemoteRepository,
5961
RemoteRepositoryOrigin,
6062
)
63+
from dagster._core.remote_representation.code_location import CodeLocation
6164
from dagster._core.storage.compute_log_manager import ComputeIOType
6265
from dagster._core.storage.dagster_run import (
6366
IN_PROGRESS_RUN_STATUSES,
@@ -69,6 +72,7 @@
6972
ASSET_PARTITION_RANGE_END_TAG,
7073
ASSET_PARTITION_RANGE_START_TAG,
7174
BACKFILL_ID_TAG,
75+
BACKFILL_TAGS,
7276
MAX_RETRIES_TAG,
7377
PARTITION_NAME_TAG,
7478
)
@@ -3164,3 +3168,85 @@ def test_asset_backfill_retries_make_downstreams_runnable(
31643168
backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
31653169
== 0
31663170
)
3171+
3172+
3173+
def test_run_retry_not_part_of_completed_backfill(
3174+
instance: DagsterInstance,
3175+
workspace_context: WorkspaceProcessContext,
3176+
code_location: CodeLocation,
3177+
remote_repo: RemoteRepository,
3178+
):
3179+
backfill_id = "run_retries_backfill"
3180+
partition_keys = static_partitions.get_partition_keys()
3181+
asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
3182+
instance.add_backfill(
3183+
PartitionBackfill.from_asset_partitions(
3184+
asset_graph=workspace_context.create_request_context().asset_graph,
3185+
backfill_id=backfill_id,
3186+
tags={"custom_tag_key": "custom_tag_value"},
3187+
backfill_timestamp=get_current_timestamp(),
3188+
asset_selection=asset_selection,
3189+
partition_names=partition_keys,
3190+
dynamic_partitions_store=instance,
3191+
all_partitions=False,
3192+
title=None,
3193+
description=None,
3194+
)
3195+
)
3196+
assert instance.get_runs_count() == 0
3197+
backfill = instance.get_backfill(backfill_id)
3198+
assert backfill
3199+
assert backfill.status == BulkActionStatus.REQUESTED
3200+
3201+
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
3202+
assert instance.get_runs_count() == 3
3203+
wait_for_all_runs_to_start(instance, timeout=30)
3204+
assert instance.get_runs_count() == 3
3205+
wait_for_all_runs_to_finish(instance, timeout=30)
3206+
3207+
assert instance.get_runs_count() == 3
3208+
runs = reversed(list(instance.get_runs()))
3209+
for run in runs:
3210+
assert run.tags[BACKFILL_ID_TAG] == backfill_id
3211+
assert run.tags["custom_tag_key"] == "custom_tag_value"
3212+
assert step_succeeded(instance, run, "foo")
3213+
assert step_succeeded(instance, run, "reusable")
3214+
assert step_succeeded(instance, run, "bar")
3215+
3216+
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
3217+
backfill = instance.get_backfill(backfill_id)
3218+
assert backfill
3219+
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
3220+
3221+
# simulate a retry of a run
3222+
run_to_retry = instance.get_runs()[0]
3223+
selector = JobSubsetSelector(
3224+
location_name=code_location.name,
3225+
repository_name=remote_repo.name,
3226+
job_name=run_to_retry.job_name,
3227+
asset_selection=run_to_retry.asset_selection,
3228+
op_selection=None,
3229+
)
3230+
remote_job = code_location.get_job(selector)
3231+
retried_run = instance.create_reexecuted_run(
3232+
parent_run=run_to_retry,
3233+
code_location=code_location,
3234+
remote_job=remote_job,
3235+
strategy=ReexecutionStrategy.ALL_STEPS,
3236+
run_config=run_to_retry.run_config,
3237+
use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested
3238+
)
3239+
3240+
for tag in BACKFILL_TAGS:
3241+
assert tag not in retried_run.tags.keys()
3242+
3243+
# Since the backfill is alerady complete, it should not be processed by the backfill daemon and
3244+
# should remain in a completed state
3245+
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
3246+
backfill = instance.get_backfill(backfill_id)
3247+
assert backfill
3248+
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
3249+
3250+
assert retried_run.run_id not in [
3251+
r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id))
3252+
]

0 commit comments

Comments
 (0)