From a6d74b11ddf0cccf1cc928065e955bcaefc330b9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 12 Nov 2024 16:19:05 -0500 Subject: [PATCH 1/7] start process of removing tags when a run is retried from a completed backfill --- .../dagster/_core/instance/__init__.py | 24 +++++++++++++++---- .../dagster/dagster/_core/storage/tags.py | 4 ++++ 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index faadb38006a6a..34f5f861a2ef1 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -71,10 +71,13 @@ from dagster._core.storage.tags import ( ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, + BACKFILL_ID_TAG, + BACKFILL_TAGS, PARENT_RUN_ID_TAG, PARTITION_NAME_TAG, RESUME_RETRY_TAG, ROOT_RUN_ID_TAG, + TAGS_TO_MAYBE_OMIT_ON_RETRY, TAGS_TO_OMIT_ON_RETRY, WILL_RETRY_TAG, ) @@ -1633,6 +1636,7 @@ def create_reexecuted_run( run_config: Optional[Mapping[str, Any]] = None, use_parent_run_tags: bool = False, ) -> DagsterRun: + from dagster._core.execution.backfill import BulkActionStatus from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.execution.plan.state import KnownExecutionState from dagster._core.remote_representation import CodeLocation, RemoteJob @@ -1650,11 +1654,21 @@ def create_reexecuted_run( parent_run_id = parent_run.run_id # these can differ from remote_job.tags if tags were added at launch time - parent_run_tags = ( - {key: val for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY} - if use_parent_run_tags - else {} - ) + parent_run_tags = {} + if use_parent_run_tags: + parent_run_tags = { + key: val + for key, val in parent_run.tags.items() + if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY + } + # if the run was part of a backfill and the backfill is complete, we do not want the + # retry to be considered part of the backfill, so remove all backfill-related tags + if parent_run.tags.get(BACKFILL_ID_TAG) is not None: + backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) + if backfill.status == BulkActionStatus.REQUESTED: + for tag in BACKFILL_TAGS: + if parent_run.tags.get(tag) is not None: + parent_run_tags[tag] = parent_run.tags[tag] tags = merge_dicts( remote_job.tags, diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index c510becec9772..2e67826f4d3b9 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -108,6 +108,8 @@ RUN_METRICS_POLLING_INTERVAL_TAG = f"{HIDDEN_TAG_PREFIX}run_metrics_polling_interval" RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics" +BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG} + TAGS_TO_OMIT_ON_RETRY = { *RUN_METRIC_TAGS, @@ -116,6 +118,8 @@ AUTO_RETRY_RUN_ID_TAG, } +TAGS_TO_MAYBE_OMIT_ON_RETRY = {*BACKFILL_TAGS} + class TagType(Enum): # Custom tag provided by a user From e0102413ef5c4ac3d7c0ba100bc051faa6c04e5e Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 13 Nov 2024 13:24:12 -0500 Subject: [PATCH 2/7] try add test --- .../graphql/test_partition_backfill.py | 61 ++++++++++++ .../dagster/_core/instance/__init__.py | 8 +- .../daemon_tests/test_backfill.py | 95 +++++++++++++++++++ 3 files changed, 162 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 8d12e2e0c9f5f..5015036056062 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -37,6 +37,7 @@ from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, + LAUNCH_PIPELINE_REEXECUTION_MUTATION, ) from dagster_graphql.test.utils import ( execute_dagster_graphql, @@ -2309,3 +2310,63 @@ def test_retry_successful_job_backfill(self, graphql_context): assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id + + def test_run_retry_not_part_of_completed_backfill(self, graphql_context): + # TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill + repository_selector = infer_repository_selector(graphql_context) + result = execute_dagster_graphql( + graphql_context, + LAUNCH_PARTITION_BACKFILL_MUTATION, + variables={ + "backfillParams": { + "selector": { + "repositorySelector": repository_selector, + "partitionSetName": "integers_partition_set", + }, + "partitionNames": ["2", "3", "4", "5"], + } + }, + ) + + assert not result.errors + assert result.data + assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" + backfill_id = result.data["launchPartitionBackfill"]["backfillId"] + + _seed_runs( + graphql_context, + [ + (DagsterRunStatus.SUCCESS, "5"), + (DagsterRunStatus.SUCCESS, "2"), + (DagsterRunStatus.SUCCESS, "3"), + (DagsterRunStatus.SUCCESS, "4"), + (DagsterRunStatus.SUCCESS, "5"), + (DagsterRunStatus.SUCCESS, "2"), + (DagsterRunStatus.FAILURE, "3"), + (DagsterRunStatus.SUCCESS, "4"), + ], + backfill_id, + ) + + backfill = graphql_context.instance.get_backfill(backfill_id) + graphql_context.instance.update_backfill( + backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) + ) + + failed_run = graphql_context.instance.get_runs( + filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE]) + )[0] + + retry_run_result = execute_dagster_graphql( + graphql_context, + LAUNCH_PIPELINE_REEXECUTION_MUTATION, + variables={ + "reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"} + }, + ) + assert not retry_run_result.errors + assert retry_run_result.data + assert ( + retry_run_result.data["launchPipelineReexecution"]["__typename"] + == "LaunchPipelineReexecutionSuccess" + ) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 34f5f861a2ef1..9adde170fc66f 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1661,9 +1661,13 @@ def create_reexecuted_run( for key, val in parent_run.tags.items() if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY } - # if the run was part of a backfill and the backfill is complete, we do not want the - # retry to be considered part of the backfill, so remove all backfill-related tags + # for all tags in TAGS_TO_MAYBE_OMIT_ON_RETRY, add a condition that determines + # whether the tag should be added to the retried run + + # condition for BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG if parent_run.tags.get(BACKFILL_ID_TAG) is not None: + # if the run was part of a backfill and the backfill is complete, we do not want the + # retry to be considered part of the backfill, so remove all backfill-related tags backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) if backfill.status == BulkActionStatus.REQUESTED: for tag in BACKFILL_TAGS: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index 50077a20ce36e..cd2c386297dab 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3164,3 +3164,98 @@ def test_asset_backfill_retries_make_downstreams_runnable( backfill.asset_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets == 0 ) + + +def test_run_retry_not_part_of_completed_backfill( + instance: DagsterInstance, + workspace_context: WorkspaceProcessContext, + remote_repo: RemoteRepository, +): + del remote_repo + backfill_id = "run_retries_backfill" + partition_keys = static_partitions.get_partition_keys() + asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] + instance.add_backfill( + PartitionBackfill.from_asset_partitions( + asset_graph=workspace_context.create_request_context().asset_graph, + backfill_id=backfill_id, + tags={"custom_tag_key": "custom_tag_value"}, + backfill_timestamp=get_current_timestamp(), + asset_selection=asset_selection, + partition_names=partition_keys, + dynamic_partitions_store=instance, + all_partitions=False, + title=None, + description=None, + ) + ) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_start(instance, timeout=30) + assert instance.get_runs_count() == 3 + wait_for_all_runs_to_finish(instance, timeout=30) + + assert instance.get_runs_count() == 3 + runs = reversed(list(instance.get_runs())) + for run in runs: + assert run.tags[BACKFILL_ID_TAG] == backfill_id + assert run.tags["custom_tag_key"] == "custom_tag_value" + assert step_succeeded(instance, run, "foo") + assert step_succeeded(instance, run, "reusable") + assert step_succeeded(instance, run, "bar") + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + + # manual retry of a run + instance.create_reexecuted_run() + + # simulate a retry of a run + run_to_retry = instance.get_runs()[0] + retried_run = create_run_for_test( + instance=instance, + job_name=run_to_retry.job_name, + tags=run_to_retry.tags, + root_run_id=run_to_retry.run_id, + parent_run_id=run_to_retry.run_id, + ) + + # since there is a run in progress, the backfill should not be marked as complete, even though + # all targeted asset partitions have a completed state + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.asset_backfill_data + assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() + assert backfill.status == BulkActionStatus.REQUESTED + + # manually mark the run as successful to show that the backfill will be marked as complete + # since there are no in progress runs + instance.handle_new_event( + EventLogEntry( + error_info=None, + level="debug", + user_message="", + run_id=retried_run.run_id, + timestamp=time.time(), + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.RUN_SUCCESS.value, + job_name=retried_run.job_name, + ), + ) + ) + + retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] + assert retried_run.status == DagsterRunStatus.SUCCESS + + list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS From 3204df0b3b64a7b14d989c95433b314b60b7e83a Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 13 Nov 2024 15:07:22 -0500 Subject: [PATCH 3/7] fix condition --- python_modules/dagster/dagster/_core/instance/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 9adde170fc66f..1628bdf3b13e7 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1669,7 +1669,7 @@ def create_reexecuted_run( # if the run was part of a backfill and the backfill is complete, we do not want the # retry to be considered part of the backfill, so remove all backfill-related tags backfill = self.get_backfill(parent_run.tags[BACKFILL_ID_TAG]) - if backfill.status == BulkActionStatus.REQUESTED: + if backfill and backfill.status == BulkActionStatus.REQUESTED: for tag in BACKFILL_TAGS: if parent_run.tags.get(tag) is not None: parent_run_tags[tag] = parent_run.tags[tag] From 447a3ca7b00a07f02045c70967cfeade31361427 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 18 Nov 2024 16:36:42 -0500 Subject: [PATCH 4/7] try test again --- .../graphql/test_partition_backfill.py | 61 ------------------- .../dagster_tests/daemon_tests/conftest.py | 17 ++++-- .../daemon_tests/test_backfill.py | 53 ++++++---------- 3 files changed, 31 insertions(+), 100 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 5015036056062..8d12e2e0c9f5f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -37,7 +37,6 @@ from dagster_graphql.client.query import ( LAUNCH_PARTITION_BACKFILL_MUTATION, LAUNCH_PIPELINE_EXECUTION_MUTATION, - LAUNCH_PIPELINE_REEXECUTION_MUTATION, ) from dagster_graphql.test.utils import ( execute_dagster_graphql, @@ -2310,63 +2309,3 @@ def test_retry_successful_job_backfill(self, graphql_context): assert retried_backfill.tags.get(PARENT_BACKFILL_ID_TAG) == backfill_id assert retried_backfill.tags.get(ROOT_BACKFILL_ID_TAG) == backfill_id - - def test_run_retry_not_part_of_completed_backfill(self, graphql_context): - # TestLaunchDaemonBackfillFromFailure::test_run_retry_not_part_of_completed_backfill - repository_selector = infer_repository_selector(graphql_context) - result = execute_dagster_graphql( - graphql_context, - LAUNCH_PARTITION_BACKFILL_MUTATION, - variables={ - "backfillParams": { - "selector": { - "repositorySelector": repository_selector, - "partitionSetName": "integers_partition_set", - }, - "partitionNames": ["2", "3", "4", "5"], - } - }, - ) - - assert not result.errors - assert result.data - assert result.data["launchPartitionBackfill"]["__typename"] == "LaunchBackfillSuccess" - backfill_id = result.data["launchPartitionBackfill"]["backfillId"] - - _seed_runs( - graphql_context, - [ - (DagsterRunStatus.SUCCESS, "5"), - (DagsterRunStatus.SUCCESS, "2"), - (DagsterRunStatus.SUCCESS, "3"), - (DagsterRunStatus.SUCCESS, "4"), - (DagsterRunStatus.SUCCESS, "5"), - (DagsterRunStatus.SUCCESS, "2"), - (DagsterRunStatus.FAILURE, "3"), - (DagsterRunStatus.SUCCESS, "4"), - ], - backfill_id, - ) - - backfill = graphql_context.instance.get_backfill(backfill_id) - graphql_context.instance.update_backfill( - backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) - ) - - failed_run = graphql_context.instance.get_runs( - filters=RunsFilter(statuses=[DagsterRunStatus.FAILURE]) - )[0] - - retry_run_result = execute_dagster_graphql( - graphql_context, - LAUNCH_PIPELINE_REEXECUTION_MUTATION, - variables={ - "reexecutionParams": {"parentRunId": failed_run.run_id, "strategy": "ALL_STEPS"} - }, - ) - assert not retry_run_result.errors - assert retry_run_result.data - assert ( - retry_run_result.data["launchPipelineReexecution"]["__typename"] - == "LaunchPipelineReexecutionSuccess" - ) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py index edb4acfd36dbe..712318128e4d0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/conftest.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/conftest.py @@ -64,16 +64,23 @@ def workspace_fixture(instance_module_scoped) -> Iterator[WorkspaceProcessContex yield workspace_context -@pytest.fixture(name="remote_repo", scope="module") -def remote_repo_fixture( +@pytest.fixture(name="code_location", scope="module") +def code_location_fixture( workspace_context: WorkspaceProcessContext, -) -> Iterator[RemoteRepository]: - yield cast( +) -> CodeLocation: + return cast( CodeLocation, next( iter(workspace_context.create_request_context().get_code_location_entries().values()) ).code_location, - ).get_repository("the_repo") + ) + + +@pytest.fixture(name="remote_repo", scope="module") +def remote_repo_fixture( + code_location: CodeLocation, +) -> Iterator[RemoteRepository]: + yield code_location.get_repository("the_repo") def loadable_target_origin(attribute: Optional[str] = None) -> LoadableTargetOrigin: diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index cd2c386297dab..a0d817a0a3de9 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -53,11 +53,13 @@ get_asset_backfill_run_chunk_size, ) from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill +from dagster._core.execution.plan.resume_retry import ReexecutionStrategy from dagster._core.remote_representation import ( InProcessCodeLocationOrigin, RemoteRepository, RemoteRepositoryOrigin, ) +from dagster._core.remote_representation.code_location import CodeLocation from dagster._core.storage.compute_log_manager import ComputeIOType from dagster._core.storage.dagster_run import ( IN_PROGRESS_RUN_STATUSES, @@ -69,6 +71,7 @@ ASSET_PARTITION_RANGE_END_TAG, ASSET_PARTITION_RANGE_START_TAG, BACKFILL_ID_TAG, + BACKFILL_TAGS, MAX_RETRIES_TAG, PARTITION_NAME_TAG, ) @@ -3169,9 +3172,9 @@ def test_asset_backfill_retries_make_downstreams_runnable( def test_run_retry_not_part_of_completed_backfill( instance: DagsterInstance, workspace_context: WorkspaceProcessContext, + code_location: CodeLocation, remote_repo: RemoteRepository, ): - del remote_repo backfill_id = "run_retries_backfill" partition_keys = static_partitions.get_partition_keys() asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] @@ -3215,47 +3218,29 @@ def test_run_retry_not_part_of_completed_backfill( assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS # manual retry of a run - instance.create_reexecuted_run() # simulate a retry of a run run_to_retry = instance.get_runs()[0] - retried_run = create_run_for_test( - instance=instance, - job_name=run_to_retry.job_name, - tags=run_to_retry.tags, - root_run_id=run_to_retry.run_id, - parent_run_id=run_to_retry.run_id, + remote_job = remote_repo.get_full_job(run_to_retry.job_name) + retried_run = instance.create_reexecuted_run( + parent_run=run_to_retry, + code_location=code_location, + remote_job=remote_job, + strategy=ReexecutionStrategy.ALL_STEPS, ) - # since there is a run in progress, the backfill should not be marked as complete, even though - # all targeted asset partitions have a completed state + for tag in BACKFILL_TAGS: + assert tag not in retried_run.tags.keys() + + # Since the backfill is alerady complete, it should not be processed by the backfill daemon and + # should remain in a completed state list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill - assert backfill.asset_backfill_data - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() assert backfill.status == BulkActionStatus.REQUESTED - # manually mark the run as successful to show that the backfill will be marked as complete - # since there are no in progress runs - instance.handle_new_event( - EventLogEntry( - error_info=None, - level="debug", - user_message="", - run_id=retried_run.run_id, - timestamp=time.time(), - dagster_event=DagsterEvent( - event_type_value=DagsterEventType.RUN_SUCCESS.value, - job_name=retried_run.job_name, - ), - ) - ) - - retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] - assert retried_run.status == DagsterRunStatus.SUCCESS + wait_for_all_runs_to_finish(instance, timeout=30) - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) - backfill = instance.get_backfill(backfill_id) - assert backfill - assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS + assert retried_run.run_id not in [ + r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) + ] From 9f96bb22f19bf8bfca73128976c415f7cf27b6fd Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 27 Nov 2024 11:13:38 -0500 Subject: [PATCH 5/7] update test --- .../dagster/dagster_tests/daemon_tests/test_backfill.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index a0d817a0a3de9..ad9b4f01cb1e0 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -3217,8 +3217,6 @@ def test_run_retry_not_part_of_completed_backfill( assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS - # manual retry of a run - # simulate a retry of a run run_to_retry = instance.get_runs()[0] remote_job = remote_repo.get_full_job(run_to_retry.job_name) @@ -3237,7 +3235,7 @@ def test_run_retry_not_part_of_completed_backfill( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill(backfill_id) assert backfill - assert backfill.status == BulkActionStatus.REQUESTED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS wait_for_all_runs_to_finish(instance, timeout=30) From 55259407cd14b291bdaa7449cf72d6dc886bdcf7 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 2 Dec 2024 11:38:47 -0500 Subject: [PATCH 6/7] fix test --- .../dagster_tests/daemon_tests/test_backfill.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index ad9b4f01cb1e0..934a84fe825c8 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -41,6 +41,7 @@ from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import DynamicPartitionsDefinition, PartitionedConfig from dagster._core.definitions.selector import ( + JobSubsetSelector, PartitionRangeSelector, PartitionsByAssetSelector, PartitionsSelector, @@ -3219,12 +3220,21 @@ def test_run_retry_not_part_of_completed_backfill( # simulate a retry of a run run_to_retry = instance.get_runs()[0] - remote_job = remote_repo.get_full_job(run_to_retry.job_name) + selector = JobSubsetSelector( + location_name=code_location.name, + repository_name=remote_repo.name, + job_name=run_to_retry.job_name, + asset_selection=run_to_retry.asset_selection, + op_selection=None, + ) + remote_job = code_location.get_job(selector) retried_run = instance.create_reexecuted_run( parent_run=run_to_retry, code_location=code_location, remote_job=remote_job, strategy=ReexecutionStrategy.ALL_STEPS, + run_config=run_to_retry.run_config, + use_parent_run_tags=True, # ensures that the logic for not copying over backfill tags is tested ) for tag in BACKFILL_TAGS: @@ -3237,8 +3247,6 @@ def test_run_retry_not_part_of_completed_backfill( assert backfill assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS - wait_for_all_runs_to_finish(instance, timeout=30) - assert retried_run.run_id not in [ r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) ] From ef43e6be0a7ce4082da3383ef81dd9ebafcf5c73 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Tue, 3 Dec 2024 12:34:18 -0500 Subject: [PATCH 7/7] pr comments --- .../dagster/dagster/_core/instance/__init__.py | 17 +++++++---------- .../dagster/dagster/_core/storage/tags.py | 5 ++--- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 1628bdf3b13e7..fb42ffe3b8c2a 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -78,7 +78,6 @@ RESUME_RETRY_TAG, ROOT_RUN_ID_TAG, TAGS_TO_MAYBE_OMIT_ON_RETRY, - TAGS_TO_OMIT_ON_RETRY, WILL_RETRY_TAG, ) from dagster._serdes import ConfigurableClass @@ -1654,17 +1653,15 @@ def create_reexecuted_run( parent_run_id = parent_run.run_id # these can differ from remote_job.tags if tags were added at launch time - parent_run_tags = {} + parent_run_tags_to_include = {} if use_parent_run_tags: - parent_run_tags = { + parent_run_tags_to_include = { key: val for key, val in parent_run.tags.items() - if key not in TAGS_TO_OMIT_ON_RETRY and key not in TAGS_TO_MAYBE_OMIT_ON_RETRY + if key not in TAGS_TO_MAYBE_OMIT_ON_RETRY } - # for all tags in TAGS_TO_MAYBE_OMIT_ON_RETRY, add a condition that determines - # whether the tag should be added to the retried run - - # condition for BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG + # condition to determine whether to include BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, + # ROOT_BACKFILL_ID_TAG on retried run if parent_run.tags.get(BACKFILL_ID_TAG) is not None: # if the run was part of a backfill and the backfill is complete, we do not want the # retry to be considered part of the backfill, so remove all backfill-related tags @@ -1672,11 +1669,11 @@ def create_reexecuted_run( if backfill and backfill.status == BulkActionStatus.REQUESTED: for tag in BACKFILL_TAGS: if parent_run.tags.get(tag) is not None: - parent_run_tags[tag] = parent_run.tags[tag] + parent_run_tags_to_include[tag] = parent_run.tags[tag] tags = merge_dicts( remote_job.tags, - parent_run_tags, + parent_run_tags_to_include, extra_tags or {}, { PARENT_RUN_ID_TAG: parent_run_id, diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index 2e67826f4d3b9..7c22457b39ee5 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -111,15 +111,14 @@ BACKFILL_TAGS = {BACKFILL_ID_TAG, PARENT_BACKFILL_ID_TAG, ROOT_BACKFILL_ID_TAG} -TAGS_TO_OMIT_ON_RETRY = { +TAGS_TO_MAYBE_OMIT_ON_RETRY = { *RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, AUTO_RETRY_RUN_ID_TAG, + *BACKFILL_TAGS, } -TAGS_TO_MAYBE_OMIT_ON_RETRY = {*BACKFILL_TAGS} - class TagType(Enum): # Custom tag provided by a user