Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backfill daemon run retries 1/n] update how we determine backfill completion to account for retried runs #25771

Merged
merged 12 commits into from
Dec 5, 2024
80 changes: 72 additions & 8 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from dagster._core.storage.dagster_run import (
CANCELABLE_RUN_STATUSES,
IN_PROGRESS_RUN_STATUSES,
NOT_FINISHED_STATUSES,
DagsterRunStatus,
RunsFilter,
)
Expand Down Expand Up @@ -166,11 +167,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool):
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots)

def is_complete(self) -> bool:
def all_targeted_partitions_have_materialization_status(self) -> bool:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming this since it is no longer the only thing used to determine backfill completion

"""The asset backfill is complete when all runs to be requested have finished (success,
failure, or cancellation). Since the AssetBackfillData object stores materialization states
per asset partition, the daemon continues to update the backfill data until all runs have
finished in order to display the final partition statuses in the UI.
per asset partition, we can use the materialization states and whether any runs for the backfill are
not finished to determine if the backfill is complete. We want the daemon to continue to update
the backfill data until all runs have finished in order to display the final partition statuses in the UI.
"""
return (
(
Expand Down Expand Up @@ -927,6 +929,67 @@ def _check_validity_and_deserialize_asset_backfill_data(
return asset_backfill_data


def backfill_is_complete(
backfill_id: str,
backfill_data: AssetBackfillData,
instance: DagsterInstance,
logger: logging.Logger,
):
"""A backfill is complete when:
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition).
2. there are no in progress runs for the backfill.
3. there are no failed runs that will result in an automatic retry, but have not yet been retried.

Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried
but it was not added into the queue in time to be caught by condition 2.

Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the
daemon continues to update the backfill data until all runs have finished in order to display the
final partition statuses in the UI.
"""
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill
# is not complete
if not backfill_data.all_targeted_partitions_have_materialization_status():
logger.info(
"Not all targeted asset partitions have a materialization status. Backfill is still in progress."
)
return False
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete
if (
len(
instance.get_run_ids(
filters=RunsFilter(
statuses=NOT_FINISHED_STATUSES,
tags={BACKFILL_ID_TAG: backfill_id},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one of those magical tags that the run storage can use to make the query always efficient right? I assume so since job backfills use it too (thinking of should_tag_be_used_for_indexing_filtering here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we special case the backfill_id tag for plus so that it doesn't use the tags table

),
limit=1,
)
)
> 0
):
logger.info("Backfill has in progress runs. Backfill is still in progress.")
return False
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this still behave reasonably on old versions of user code that are not necessarily setting WILL_RETRY_TAG? I think in that case we would just ignore this condition right? (and potentially finish the backfill 'early', like we were doing before)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe i made a bad assumption, but i figured that the version of the backfill daemon would be the same as the version of the auto-retry daemon. and the auto-retry daemon will add the will_retry tag if it wasn't added when the run failure event was handled, which made me think we could rely on this being set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yes, in the case that the will_retry tag isn't getting added to runs, the runs will have is_complete_and_waiting_to_retry as False so that would result in the backfill being considered complete

if any(
[
run.is_complete_and_waiting_to_retry
for run in instance.get_runs(
filters=RunsFilter(
tags={BACKFILL_ID_TAG: backfill_id},
statuses=[DagsterRunStatus.FAILURE],
)
)
]
):
logger.info(
"Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress."
)
return False
return True


def execute_asset_backfill_iteration(
backfill: "PartitionBackfill",
logger: logging.Logger,
Expand Down Expand Up @@ -1045,11 +1108,12 @@ def execute_asset_backfill_iteration(

updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph)

if updated_backfill_data.is_complete():
# The asset backfill is complete when all runs to be requested have finished (success,
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
if backfill_is_complete(
backfill_id=backfill.backfill_id,
backfill_data=updated_backfill_data,
instance=instance,
logger=logger,
):
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
Expand Down
21 changes: 21 additions & 0 deletions python_modules/dagster/dagster/_core/storage/dagster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import (
ASSET_EVALUATION_ID_TAG,
AUTO_RETRY_RUN_ID_TAG,
AUTOMATION_CONDITION_TAG,
BACKFILL_ID_TAG,
PARENT_RUN_ID_TAG,
Expand All @@ -33,10 +34,12 @@
SCHEDULE_NAME_TAG,
SENSOR_NAME_TAG,
TICK_ID_TAG,
WILL_RETRY_TAG,
)
from dagster._core.utils import make_new_run_id
from dagster._record import IHaveNew, record_custom
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes
from dagster._utils.tags import get_boolean_tag_value

if TYPE_CHECKING:
from dagster._core.definitions.schedule_definition import ScheduleDefinition
Expand Down Expand Up @@ -478,6 +481,24 @@ def is_resume_retry(self) -> bool:
"""bool: If this run was created from retrying another run from the point of failure."""
return self.tags.get(RESUME_RETRY_TAG) == "true"

@property
def is_complete_and_waiting_to_retry(self):
"""Indicates if a run is waiting to be retried by the auto-reexecution system.
Returns True if 1) the run is complete, 2) the run is in a failed state (therefore eligible for retry),
3) the run is marked as needing to be retried, and 4) the retried run has not been launched yet.
Otherwise returns False.
"""
if self.status in NOT_FINISHED_STATUSES:
return False
if self.status != DagsterRunStatus.FAILURE:
return False
will_retry = get_boolean_tag_value(self.tags.get(WILL_RETRY_TAG), default_value=False)
retry_not_launched = self.tags.get(AUTO_RETRY_RUN_ID_TAG) is None
if will_retry:
return retry_not_launched

return False

@property
def previous_run_id(self) -> Optional[str]:
# Compat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
AssetBackfillData,
AssetBackfillIterationResult,
AssetBackfillStatus,
backfill_is_complete,
execute_asset_backfill_iteration_inner,
get_canceling_asset_backfill_iteration_data,
)
Expand Down Expand Up @@ -618,7 +619,12 @@ def run_backfill_to_completion(
evaluation_time=backfill_data.backfill_start_datetime,
)

while not backfill_data.is_complete():
while not backfill_is_complete(
backfill_id=backfill_id,
backfill_data=backfill_data,
instance=instance,
logger=logging.getLogger("fake_logger"),
):
iteration_count += 1

result1 = execute_asset_backfill_iteration_consume_generator(
Expand All @@ -628,7 +634,6 @@ def run_backfill_to_completion(
instance=instance,
)

# iteration_count += 1
assert result1.backfill_data != backfill_data

instance_queryer = _get_instance_queryer(
Expand Down
24 changes: 16 additions & 8 deletions python_modules/dagster/dagster_tests/daemon_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
from typing import Iterator, Optional, cast

import pytest
Expand All @@ -21,15 +22,22 @@

@pytest.fixture(name="instance_module_scoped", scope="module")
def instance_module_scoped_fixture() -> Iterator[DagsterInstance]:
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
with tempfile.TemporaryDirectory() as temp_dir:
with instance_for_test(
overrides={
"run_launcher": {
"module": "dagster._core.launcher.sync_in_memory_run_launcher",
"class": "SyncInMemoryRunLauncher",
},
"event_log_storage": {
"module": "dagster._core.storage.event_log",
"class": "ConsolidatedSqliteEventLogStorage",
"config": {"base_dir": temp_dir},
},
"run_retries": {"enabled": True},
}
}
) as instance:
yield instance
) as instance:
yield instance


@pytest.fixture(name="instance", scope="function")
Expand Down
Loading