-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[backfill daemon run retries 1/n] update how we determine backfill completion to account for retried runs #25771
Changes from all commits
fa99a25
382de7e
d1f0cdb
2709f89
14d2db6
932a447
6017c7f
247e4f4
ef8336f
e9ec931
edfa1a3
701562d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,7 @@ | |
from dagster._core.storage.dagster_run import ( | ||
CANCELABLE_RUN_STATUSES, | ||
IN_PROGRESS_RUN_STATUSES, | ||
NOT_FINISHED_STATUSES, | ||
DagsterRunStatus, | ||
RunsFilter, | ||
) | ||
|
@@ -166,11 +167,12 @@ def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBack | |
def with_requested_runs_for_target_roots(self, requested_runs_for_target_roots: bool): | ||
return self._replace(requested_runs_for_target_roots=requested_runs_for_target_roots) | ||
|
||
def is_complete(self) -> bool: | ||
def all_targeted_partitions_have_materialization_status(self) -> bool: | ||
"""The asset backfill is complete when all runs to be requested have finished (success, | ||
failure, or cancellation). Since the AssetBackfillData object stores materialization states | ||
per asset partition, the daemon continues to update the backfill data until all runs have | ||
finished in order to display the final partition statuses in the UI. | ||
per asset partition, we can use the materialization states and whether any runs for the backfill are | ||
not finished to determine if the backfill is complete. We want the daemon to continue to update | ||
the backfill data until all runs have finished in order to display the final partition statuses in the UI. | ||
""" | ||
return ( | ||
( | ||
|
@@ -927,6 +929,67 @@ def _check_validity_and_deserialize_asset_backfill_data( | |
return asset_backfill_data | ||
|
||
|
||
def backfill_is_complete( | ||
backfill_id: str, | ||
backfill_data: AssetBackfillData, | ||
instance: DagsterInstance, | ||
logger: logging.Logger, | ||
): | ||
"""A backfill is complete when: | ||
1. all asset partitions in the target subset have a materialization state (successful, failed, downstream of a failed partition). | ||
2. there are no in progress runs for the backfill. | ||
3. there are no failed runs that will result in an automatic retry, but have not yet been retried. | ||
|
||
Condition 1 ensures that for each asset partition we have attempted to materialize it or have determined we | ||
cannot materialize it because of a failed dependency. Condition 2 ensures that no retries of failed runs are | ||
in progress. Condition 3 guards against a race condition where a failed run could be automatically retried | ||
but it was not added into the queue in time to be caught by condition 2. | ||
|
||
Since the AssetBackfillData object stores materialization states per asset partition, we want to ensure the | ||
daemon continues to update the backfill data until all runs have finished in order to display the | ||
final partition statuses in the UI. | ||
""" | ||
# Condition 1 - if any asset partitions in the target subset do not have a materialization state, the backfill | ||
# is not complete | ||
if not backfill_data.all_targeted_partitions_have_materialization_status(): | ||
logger.info( | ||
"Not all targeted asset partitions have a materialization status. Backfill is still in progress." | ||
) | ||
return False | ||
# Condition 2 - if there are in progress runs for the backfill, the backfill is not complete | ||
if ( | ||
len( | ||
instance.get_run_ids( | ||
filters=RunsFilter( | ||
statuses=NOT_FINISHED_STATUSES, | ||
tags={BACKFILL_ID_TAG: backfill_id}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is one of those magical tags that the run storage can use to make the query always efficient right? I assume so since job backfills use it too (thinking of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah we special case the backfill_id tag for plus so that it doesn't use the tags table |
||
), | ||
limit=1, | ||
) | ||
) | ||
> 0 | ||
): | ||
logger.info("Backfill has in progress runs. Backfill is still in progress.") | ||
return False | ||
# Condition 3 - if there are runs that will be retried, but have not yet been retried, the backfill is not complete | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this still behave reasonably on old versions of user code that are not necessarily setting WILL_RETRY_TAG? I think in that case we would just ignore this condition right? (and potentially finish the backfill 'early', like we were doing before) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe i made a bad assumption, but i figured that the version of the backfill daemon would be the same as the version of the auto-retry daemon. and the auto-retry daemon will add the will_retry tag if it wasn't added when the run failure event was handled, which made me think we could rely on this being set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but yes, in the case that the will_retry tag isn't getting added to runs, the runs will have |
||
if any( | ||
[ | ||
run.is_complete_and_waiting_to_retry | ||
for run in instance.get_runs( | ||
filters=RunsFilter( | ||
tags={BACKFILL_ID_TAG: backfill_id}, | ||
statuses=[DagsterRunStatus.FAILURE], | ||
) | ||
) | ||
] | ||
): | ||
logger.info( | ||
"Some runs for the backfill will be retried, but have not been launched. Backfill is still in progress." | ||
) | ||
return False | ||
return True | ||
|
||
|
||
def execute_asset_backfill_iteration( | ||
backfill: "PartitionBackfill", | ||
logger: logging.Logger, | ||
|
@@ -1045,11 +1108,12 @@ def execute_asset_backfill_iteration( | |
|
||
updated_backfill_data = updated_backfill.get_asset_backfill_data(asset_graph) | ||
|
||
if updated_backfill_data.is_complete(): | ||
# The asset backfill is complete when all runs to be requested have finished (success, | ||
# failure, or cancellation). Since the AssetBackfillData object stores materialization states | ||
# per asset partition, the daemon continues to update the backfill data until all runs have | ||
# finished in order to display the final partition statuses in the UI. | ||
if backfill_is_complete( | ||
backfill_id=backfill.backfill_id, | ||
backfill_data=updated_backfill_data, | ||
instance=instance, | ||
logger=logger, | ||
): | ||
if ( | ||
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets | ||
> 0 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renaming this since it is no longer the only thing used to determine backfill completion