Skip to content

Commit

Permalink
update did retry tag to be the automatic retry run id
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 27, 2024
1 parent 8b81eee commit c477655
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dagster._core.snap import snapshot_from_execution_plan
from dagster._core.storage.dagster_run import DagsterRunStatus, RunsFilter
from dagster._core.storage.tags import (
DID_RETRY_TAG,
AUTO_RETRY_RUN_ID,
MAX_RETRIES_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
RETRY_STRATEGY_TAG,
Expand Down Expand Up @@ -347,8 +347,9 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
)
)
assert len(instance.run_coordinator.queue()) == 1
first_retry = instance.run_coordinator.queue()[0]
run = instance.get_run_by_id(run.run_id)
assert run.tags.get(DID_RETRY_TAG) == "true"
assert run.tags.get(AUTO_RETRY_RUN_ID) == first_retry.run_id

# doesn't retry again
list(
Expand All @@ -360,7 +361,6 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
assert len(instance.run_coordinator.queue()) == 1

# retries once the new run failed
first_retry = instance.run_coordinator.queue()[0]
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
job_name="foo",
Expand All @@ -387,11 +387,11 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
)
)
assert len(instance.run_coordinator.queue()) == 2
second_retry = instance.run_coordinator.queue()[1]
first_retry = instance.get_run_by_id(first_retry.run_id)
assert first_retry.tags.get(DID_RETRY_TAG) == "true"
assert first_retry.tags.get(AUTO_RETRY_RUN_ID) == second_retry.run_id

# doesn't retry a third time
second_retry = instance.run_coordinator.queue()[1]
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
job_name="foo",
Expand Down Expand Up @@ -419,7 +419,7 @@ def test_consume_new_runs_for_automatic_reexecution(instance, workspace_context)
)
assert len(instance.run_coordinator.queue()) == 2
second_retry = instance.get_run_by_id(second_retry.run_id)
assert second_retry.tags.get(DID_RETRY_TAG) is None
assert second_retry.tags.get(AUTO_RETRY_RUN_ID) is None


def test_daemon_enabled(instance):
Expand Down
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/_core/storage/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
# This tag is used to indicate that the automatic retry daemon will launch a retry for this run
# If this tag is not on a run, it means the run did not fail or automatic retries is disabled.
WILL_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}will_retry"
DID_RETRY_TAG = f"{SYSTEM_TAG_PREFIX}did_retry"
AUTO_RETRY_RUN_ID_TAG = f"{SYSTEM_TAG_PREFIX}auto_retry_run_id"

MAX_RUNTIME_SECONDS_TAG = f"{SYSTEM_TAG_PREFIX}max_runtime"

Expand Down Expand Up @@ -107,7 +107,12 @@
RUN_METRICS_PYTHON_RUNTIME_TAG = f"{HIDDEN_TAG_PREFIX}python_runtime_metrics"


TAGS_TO_OMIT_ON_RETRY = {*RUN_METRIC_TAGS, RUN_FAILURE_REASON_TAG, WILL_RETRY_TAG, DID_RETRY_TAG}
TAGS_TO_OMIT_ON_RETRY = {
*RUN_METRIC_TAGS,
RUN_FAILURE_REASON_TAG,
WILL_RETRY_TAG,
AUTO_RETRY_RUN_ID_TAG,
}


class TagType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunRecord
from dagster._core.storage.tags import (
DID_RETRY_TAG,
AUTO_RETRY_RUN_ID_TAG,
MAX_RETRIES_TAG,
RETRY_NUMBER_TAG,
RETRY_ON_ASSET_OR_OP_FAILURE_TAG,
Expand Down Expand Up @@ -177,7 +177,7 @@ def retry_run(
)

instance.submit_run(new_run.run_id, workspace)
instance.add_run_tags(failed_run.run_id, {DID_RETRY_TAG: "true"})
instance.add_run_tags(failed_run.run_id, {AUTO_RETRY_RUN_ID_TAG: new_run.run_id})


def consume_new_runs_for_automatic_reexecution(
Expand Down

0 comments on commit c477655

Please sign in to comment.