Skip to content

Commit

Permalink
[backfill daemon run retries 1/n] update how we determine backfill co…
Browse files Browse the repository at this point in the history
…mpletion to account for retried runs (#25771)

## Summary & Motivation
The backfill daemon doesn't account for run retries. See
dagster-io/internal#12460 for more
context. We've decided that we want the daemon to account for automatic
and manual retries of runs that occur while the backfill is still in
progress. This requires two changes: ensuring the backfill isn't marked
completed if there is an in progress run or a failed run that will be
automatically retried; and updating the daemon to take the results of
retried runs into account when deciding what partitions to materialize
in the next iteration.

This PR addresses the first point, ensuring the backfill isn't marked
completed if there is an in progress run or a failed run that will be
automatically retried.

Currently a backfill is marked complete when all targeted asset
partitions are in a terminal state (successfully materialized, failed,
or downstream of a failed partition). Since failed runs may be retried,
there is a case where all asset partitions are in a terminal state, but
there is a retry in progress that could change the state of some asset
partitions. This means that if there are any runs in progress for the
partition we need to wait for them to complete before marking the
backfill complete.

Additionally, we need to account for a race condition where a failed run
may have a retry automatically launched for it, but the daemon marks the
backfill complete before the retried run is queued. This PR adds an
additional check to ensure that no failed runs are about to be retried.

## How I Tested These Changes
new unit tests

manually ran a backfill with automatic run retries configured and saw
that the backfill didn't complete until all automatic retries were
complete
  • Loading branch information
jamiedemaria authored and cmpadden committed Dec 5, 2024
1 parent 5612592 commit 1bb02f2
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 18 deletions.
80 changes: 72 additions & 8 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
IN_PROGRESS_RUN_STATUSES,
NOT_FINISHED_STATUSES,
DagsterRunStatus,
RunsFilter,
)
Expand Down Expand Up @@ -166,11 +167,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def is_complete(self) -> bool:
def all_targeted_partitions_have_materialization_status(self) -> bool:
"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
per asset partition, the daemon continues to update the backfill data until all runs have
finished in order to display the final partition statuses in the UI.
per asset partition, we can use the materialization states and whether any runs for the backfill are
not finished to determine if the backfill is complete. We want the daemon to continue to update
the backfill data until all runs have finished in order to display the final partition statuses in the UI.
"""
return (
(
Expand Down Expand Up @@ -927,6 +929,67 @@ def _check_validity_and_deserialize_asset_backfill_data(
return asset_backfill_data


def backfill_is_complete(
backfill_id: str,
backfill_data: AssetBackfillData,
instance: DagsterInstance,
logger: logging.Logger,
):
"""A backfill is complete when:
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry, but have not yet been retried.
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried
but it was not added into the queue in time to be caught by condition 2.
Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
# is not complete
if not backfill_data.all_targeted_partitions_have_materialization_status():
logger.info(
"Not all targeted asset partitions have a materialization status. Backfill is still in progress."
)
return False
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
if (
len(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
tags={BACKFILL_ID_TAG: backfill_id},
),
limit=1,
)
)
> 0
):
logger.info("Backfill has in progress runs. Backfill is still in progress.")
return False
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
if any(
[
run.is_complete_and_waiting_to_retry
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
)
]
):
logger.info(
"Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress."
)
return False
return True


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
Expand Down Expand Up @@ -1045,11 +1108,12 @@ def execute_asset_backfill_iteration(

updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if updated_backfill_data.is_complete():
# The asset backfill is complete when all runs to be requested have finished (success,
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
if backfill_is_complete(
backfill_id=backfill.backfill_id,
backfill_data=updated_backfill_data,
instance=instance,
logger=logger,
):
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
Expand Down
21 changes: 21 additions & 0 deletions python_modules/dagster/dagster/_core/storage/dagster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import (
ASSET_EVALUATION_ID_TAG,
AUTO_RETRY_RUN_ID_TAG,
AUTOMATION_CONDITION_TAG,
BACKFILL_ID_TAG,
PARENT_RUN_ID_TAG,
Expand All @@ -33,10 +34,12 @@
SCHEDULE_NAME_TAG,
SENSOR_NAME_TAG,
TICK_ID_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.definitions.schedule_definition import ScheduleDefinition
Expand Down Expand Up @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool:
"""bool: If this run was created from retrying another run from the point of failure."""
return self.tags.get(RESUME_RETRY_TAG) == "true"

@property
def is_complete_and_waiting_to_retry(self):
"""Indicates if a run is waiting to be retried by the auto-reexecution system.
Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry),
3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet.
Otherwise returns False.
"""
if self.status in NOT_FINISHED_STATUSES:
return False
if self.status != DagsterRunStatus.FAILURE:
return False
will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False)
retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None
if will_retry:
return retry_not_launched

return False

@property
def previous_run_id(self) -> Optional[str]:
# Compat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
AssetBackfillData,
AssetBackfillIterationResult,
AssetBackfillStatus,
backfill_is_complete,
execute_asset_backfill_iteration_inner,
get_canceling_asset_backfill_iteration_data,
)
Expand Down Expand Up @@ -618,7 +619,12 @@ def run_backfill_to_completion(
evaluation_time=backfill_data.backfill_start_datetime,
)

while not backfill_data.is_complete():
while not backfill_is_complete(
backfill_id=backfill_id,
backfill_data=backfill_data,
instance=instance,
logger=logging.getLogger("fake_logger"),
):
iteration_count += 1

result1 = execute_asset_backfill_iteration_consume_generator(
Expand All @@ -628,7 +634,6 @@ def run_backfill_to_completion(
instance=instance,
)

# iteration_count += 1
assert result1.backfill_data != backfill_data

instance_queryer = _get_instance_queryer(
Expand Down
24 changes: 16 additions & 8 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
from typing import Iterator, Optional, cast

import pytest
Expand All @@ -21,15 +22,22 @@

@pytest.fixture(name="instance_module_scoped", scope="module")
def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
with tempfile.TemporaryDirectory() as temp_dir:
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
"event_log_storage": {
"module": "dagster._core.storage.event_log",
"class": "ConsolidatedSqliteEventLogStorage",
"config": {"base_dir": temp_dir},
},
"run_retries": {"enabled": True},
}
}
) as instance:
yield instance
) as instance:
yield instance


@pytest.fixture(name="instance", scope="function")
Expand Down
Loading

0 comments on commit 1bb02f2

Please sign in to comment.