Skip to content

Commit

Permalink
util fn
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Dec 4, 2024
1 parent f810d43 commit 3c089a8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,14 @@
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
AUTO_RETRY_RUN_ID_TAG,
BACKFILL_ID_TAG,
PARTITION_NAME_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id, toposort
from dagster._core.workspace.context import BaseWorkspaceRequestContext, IWorkspaceProcessContext
from dagster._serdes import whitelist_for_serdes
from dagster._time import datetime_from_timestamp, get_current_timestamp
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.execution.backfill import PartitionBackfill
Expand Down Expand Up @@ -977,8 +974,7 @@ def backfill_is_complete(
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
if any(
[
get_boolean_tag_value(run.tags.get(WILL_RETRY_TAG), False)
and run.tags.get(AUTO_RETRY_RUN_ID_TAG) is None
run.is_complete_and_waiting_to_retry
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
Expand Down
21 changes: 21 additions & 0 deletions python_modules/dagster/dagster/_core/storage/dagster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import (
ASSET_EVALUATION_ID_TAG,
AUTO_RETRY_RUN_ID_TAG,
AUTOMATION_CONDITION_TAG,
BACKFILL_ID_TAG,
PARENT_RUN_ID_TAG,
Expand All @@ -33,10 +34,12 @@
SCHEDULE_NAME_TAG,
SENSOR_NAME_TAG,
TICK_ID_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.definitions.schedule_definition import ScheduleDefinition
Expand Down Expand Up @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool:
"""bool: If this run was created from retrying another run from the point of failure."""
return self.tags.get(RESUME_RETRY_TAG) == "true"

@property
def is_complete_and_waiting_to_retry(self):
"""Indicates if a run is waiting to be retried by the auto-reexecution system.
Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry),
3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet.
Otherwise returns False.
"""
if self.status in NOT_FINISHED_STATUSES:
return False
if self.status != DagsterRunStatus.FAILURE:
return False
will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False)
retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None
if will_retry:
return retry_not_launched

return False

@property
def previous_run_id(self) -> Optional[str]:
# Compat
Expand Down

0 comments on commit 3c089a8

Please sign in to comment.