From c477655f459079526ca4fccbffd13fb389b5f575 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Nov 2024 15:51:15 -0500 Subject: [PATCH] update did retry tag to be the automatic retry run id --- .../test_auto_run_reexecution.py | 12 ++++++------ python_modules/dagster/dagster/_core/storage/tags.py | 9 +++++++-- .../auto_run_reexecution/auto_run_reexecution.py | 4 ++-- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py b/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py index b2c6d7c9c54f2..95caefc2a9753 100644 --- a/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py +++ b/integration_tests/test_suites/daemon-test-suite/auto_run_reexecution_tests/test_auto_run_reexecution.py @@ -10,7 +10,7 @@ from dagster._core.snap import snapshot_from_execution_plan from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter from dagster._core.storage.tags import ( - DID_RETRY_TAG, + AUTO_RETRY_RUN_ID, MAX_RETRIES_TAG, RETRY_ON_ASSET_OR_OP_FAILURE_TAG, RETRY_STRATEGY_TAG, @@ -347,8 +347,9 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) ) ) assert len(instance.run_coordinator.queue()) == 1 + first_retry = instance.run_coordinator.queue()[0] run = instance.get_run_by_id(run.run_id) - assert run.tags.get(DID_RETRY_TAG) == "true" + assert run.tags.get(AUTO_RETRY_RUN_ID) == first_retry.run_id # doesn't retry again list( @@ -360,7 +361,6 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) assert len(instance.run_coordinator.queue()) == 1 # retries once the new run failed - first_retry = instance.run_coordinator.queue()[0] dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, job_name="foo", @@ -387,11 +387,11 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) ) ) assert len(instance.run_coordinator.queue()) == 2 + second_retry = instance.run_coordinator.queue()[1] first_retry = instance.get_run_by_id(first_retry.run_id) - assert first_retry.tags.get(DID_RETRY_TAG) == "true" + assert first_retry.tags.get(AUTO_RETRY_RUN_ID) == second_retry.run_id # doesn't retry a third time - second_retry = instance.run_coordinator.queue()[1] dagster_event = DagsterEvent( event_type_value=DagsterEventType.PIPELINE_FAILURE.value, job_name="foo", @@ -419,7 +419,7 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context) ) assert len(instance.run_coordinator.queue()) == 2 second_retry = instance.get_run_by_id(second_retry.run_id) - assert second_retry.tags.get(DID_RETRY_TAG) is None + assert second_retry.tags.get(AUTO_RETRY_RUN_ID) is None def test_daemon_enabled(instance): diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index f6ae64730a953..122b7b9fbba14 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -61,7 +61,7 @@ # This tag is used to indicate that the automatic retry daemon will launch a retry for this run # If this tag is not on a run, it means the run did not fail or automatic retries is disabled. WILL_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}will_retry" -DID_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}did_retry" +AUTO_RETRY_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}auto_retry_run_id" MAX_RUNTIME_SECONDS_TAG = f"{SYSTEM_TAG_PREFIX}max_runtime" @@ -107,7 +107,12 @@ RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics" -TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, DID_RETRY_TAG} +TAGS_TO_OMIT_ON_RETRY = { + *RUN_METRIC_TAGS, + RUN_FAILURE_REASON_TAG, + WILL_RETRY_TAG, + AUTO_RETRY_RUN_ID_TAG, +} class TagType(Enum): diff --git a/python_modules/dagster/dagster/_daemon/auto_run_reexecution/auto_run_reexecution.py b/python_modules/dagster/dagster/_daemon/auto_run_reexecution/auto_run_reexecution.py index ad9bb5c056bc9..f347ed4829c89 100644 --- a/python_modules/dagster/dagster/_daemon/auto_run_reexecution/auto_run_reexecution.py +++ b/python_modules/dagster/dagster/_daemon/auto_run_reexecution/auto_run_reexecution.py @@ -8,7 +8,7 @@ from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunRecord from dagster._core.storage.tags import ( - DID_RETRY_TAG, + AUTO_RETRY_RUN_ID_TAG, MAX_RETRIES_TAG, RETRY_NUMBER_TAG, RETRY_ON_ASSET_OR_OP_FAILURE_TAG, @@ -177,7 +177,7 @@ def retry_run( ) instance.submit_run(new_run.run_id, workspace) - instance.add_run_tags(failed_run.run_id, {DID_RETRY_TAG: "true"}) + instance.add_run_tags(failed_run.run_id, {AUTO_RETRY_RUN_ID_TAG: new_run.run_id}) def consume_new_runs_for_automatic_reexecution(