-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[backfill daemon run retries 2/n] backfill daemon incorporates retries runs when launching new runs #25853
[backfill daemon run retries 2/n] backfill daemon incorporates retries runs when launching new runs #25853
Conversation
0493541
to
bf580e4
Compare
8a2005d
to
6ce3f62
Compare
for asset_key in failed_asset_keys: | ||
result.extend( | ||
asset_graph.get_partitions_in_range( | ||
asset_key, partition_range, instance_queryer | ||
) | ||
asset_partition_candidates = asset_graph.get_partitions_in_range( | ||
asset_key, partition_range, instance_queryer | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a scoping issue with asset_partition_candidates
. The code creates these candidates for partition ranges but never uses them because the result.extend(asset_partitions_still_failed)
call is outside both branches of the if/else. To fix this:
- Move the
asset_partition_candidates
assignment inside thefor
loop in the first branch - Move
result.extend(asset_partitions_still_failed)
inside each branch of the if/else
This ensures both partition ranges and single partitions are properly filtered against the materialized subset before being added to the result.
Spotted by Graphite Reviewer
Is this helpful? React 👍 or 👎 to let us know.
6ce3f62
to
721e0a4
Compare
cd3cc4a
to
1fb8fc2
Compare
721e0a4
to
514d595
Compare
1fb8fc2
to
436cffd
Compare
eb96c61
to
a691a45
Compare
436cffd
to
6dbf15a
Compare
c93546d
to
5b52878
Compare
asset_graph, | ||
) | ||
updated_backfill_data = AssetBackfillData( | ||
target_subset=asset_backfill_data.target_subset, | ||
latest_storage_id=asset_backfill_data.latest_storage_id, | ||
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, | ||
materialized_subset=updated_materialized_subset, | ||
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset | ||
| failed_subset, | ||
failed_and_downstream_subset=failed_subset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to confirm that this is a safe change to make. From my reading of _get_failed_asset_partitions
it returns the full list of failed partitions, not a list of partitions that failed since the last tick, so in the version of this code before this PR the ORing of the two subsets is a no-op since asset_backfill_data.failed_and_downstream_subset
would be a subset of failed_subset
with the change to have _get_failed_asset_partitions
account for retries, ORing with asset_backfill_data.failed_and_downstream_subset
would result in inaccurate data because a failed partition in asset_backfill_data.failed_and_downstream_subset
could have been successfully retried and no longer in failed_subset
but would still be included because of the OR operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok i forgot that we do the second function to get the downstream of the failed subset, so this isn't a simple replacement since failed_subset
doesn't include downstream assets. i will update
6dbf15a
to
6f60763
Compare
389f553
to
46e48b3
Compare
46e48b3
to
eade146
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @clairelin135 is more familiar about the ins and outs of this particular codepath than I, I do have one question though
python_modules/dagster/dagster/_core/execution/asset_backfill.py
Outdated
Show resolved
Hide resolved
python_modules/dagster/dagster/_core/execution/asset_backfill.py
Outdated
Show resolved
Hide resolved
updated_backfill_data = AssetBackfillData( | ||
target_subset=asset_backfill_data.target_subset, | ||
latest_storage_id=asset_backfill_data.latest_storage_id, | ||
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, | ||
materialized_subset=updated_materialized_subset, | ||
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset | ||
| failed_subset, | ||
failed_and_downstream_subset=updated_failed_subset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not super familiar with this code but how is the 'and downstream' part of 'failed_and_downstrema_subset' affected by the retry case? If we add downstream assets as well to the failed list as soon as we detect a partition fails, do we need to remove them if that run is retried? Or are those added at the very end after we have decided that the backfill is totally done and failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this specifically is for the case when the backfill has been canceled.
In the normal code path what we do is:
- find which partitions failed
- do a BFS search to find all downstream partitions of the failed partitions
- set that as the
failed_and_downstream_subset
- when determining if we can launch a run for a partition, if it is in the
failed_and_downstream_subset
we know we cannot launch the run
we run this process from scratch on each backfill iteration, so we can handle run retries in the first step by checking if each failed partition has been retried successfully, and if so we remove it from the list of failed partitions. then all the following steps will produce the correct results
for this cancling case it's a bit different. the way it works now is that we take the previous failed_and_downstream_subset
and just add any partitions that have failed (effectively this just adds the partitions that failed since the last tick). we don't recalculate the downstream subset for any newly failed partitions (i'm not sure why, maybe @clairelin135 knows).
If we leave the code as is, we could have a case where a failed partition was retried and that retry succeeded since the previous tick. the partition would be in the failed_and_downstream_subset
from the previous tick, but is now successful. we want to make sure the right status is reported, so we need to remove it from the final set of failed partitions. and we can do that be removing the materialized_subset
from the failed_and_downstream_subset
Another option would be to do the process we do in a normal tick and generate the failed_and_downstream_subset
from scratch rather than do this additive approach. Not sure why that wasn't done in the first place though, so i didn't make that change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't recalculate the downstream subset for any newly failed partitions
I think we added this for performance reasons, if a backfill is canceling we do the cheapest thing possible which is to mark every requested partition as failed. We don't bother doing the graph traversal to mark their downstreams as failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here in the canceling iteration look good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here look good to me.
Initially I was wondering if we could avoid moving a partition to the failed subset until all of its retries have concluded, to simplify the state transitions here. But since we're incorporating manual retries we need to be able to move partitions and their downstreams between the failed / materialized subsets regardless.
updated_backfill_data = AssetBackfillData( | ||
target_subset=asset_backfill_data.target_subset, | ||
latest_storage_id=asset_backfill_data.latest_storage_id, | ||
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, | ||
materialized_subset=updated_materialized_subset, | ||
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset | ||
| failed_subset, | ||
failed_and_downstream_subset=updated_failed_subset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't recalculate the downstream subset for any newly failed partitions
I think we added this for performance reasons, if a backfill is canceling we do the cheapest thing possible which is to mark every requested partition as failed. We don't bother doing the graph traversal to mark their downstreams as failed.
updated_backfill_data = AssetBackfillData( | ||
target_subset=asset_backfill_data.target_subset, | ||
latest_storage_id=asset_backfill_data.latest_storage_id, | ||
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots, | ||
materialized_subset=updated_materialized_subset, | ||
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset | ||
| failed_subset, | ||
failed_and_downstream_subset=updated_failed_subset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here in the canceling iteration look good to me
@@ -1274,16 +1274,29 @@ def get_canceling_asset_backfill_iteration_data( | |||
) | |||
|
|||
failed_subset = AssetGraphSubset.from_asset_partition_set( | |||
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)), | |||
set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make sure that if the backfill is in a canceling state, retried runs are not included in the backfill
6f60763
to
41d95a9
Compare
eade146
to
7408d71
Compare
19c220c
to
e7ea44d
Compare
ad1c316
to
de67c1d
Compare
e7ea44d
to
f9c706b
Compare
de67c1d
to
6cb78c1
Compare
f9c706b
to
7c44e69
Compare
6cb78c1
to
a89be73
Compare
7c44e69
to
973062b
Compare
a89be73
to
f6c77ca
Compare
973062b
to
f810d43
Compare
eea3924
to
5126053
Compare
3c089a8
to
701562d
Compare
5126053
to
4ce8b5a
Compare
4ce8b5a
to
d6f6797
Compare
…s runs when launching new runs (#25853) ## Summary & Motivation The backfill daemon doesn't account for run retries. See dagster-io/internal#12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. This PR addresses the second point, updating the backfill daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. Currently the backfill gets a list of the successfully materialized assets for the backfill by looking at the materialization events for the asset. It determines which assets failed by looking at the failed runs launched by the backfill and pulling the asset partition information from those runs. Any assets downstream of those failed assets will not be launched by the backfill Now that we want the backfill daemon to account for run retries we need to slightly modify this logic. Since a run can be retried it is possible that an asset can have a successful materialization AND be a failed asset in a failed run. This means that when we determine which assets are failed, we need to cross check with the assets that have been successfully materialized and remove any that are in the materialized list ## How I Tested These Changes new unit tests manual test ```python @dg.asset( partitions_def=daily_partitions_def, ) def has_retry_policy(context): if context.run.parent_run_id is None: raise Exception("I failed") else: return 1 @dg.asset(partitions_def=weekly_partitions_def) def another_weekly(context, has_retry_policy): return 1 ``` run a backfill of at least 1 week. Can watch the backfill progress and see that once all of the daily assets retry and succeed, the weekly asset will get kicked off. This contrasts with the behavior in the PR down the stack where the daily assets would retry and succeed, but the weekly asset would never get kicked off ## Changelog Manual and automatic retries of runs launched by backfills that occur while the backfill is still in progress are now incorporated into the backfill's status
…s runs when launching new runs (dagster-io#25853) ## Summary & Motivation The backfill daemon doesn't account for run retries. See https://github.com/dagster-io/internal/discussions/12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. This PR addresses the second point, updating the backfill daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration. Currently the backfill gets a list of the successfully materialized assets for the backfill by looking at the materialization events for the asset. It determines which assets failed by looking at the failed runs launched by the backfill and pulling the asset partition information from those runs. Any assets downstream of those failed assets will not be launched by the backfill Now that we want the backfill daemon to account for run retries we need to slightly modify this logic. Since a run can be retried it is possible that an asset can have a successful materialization AND be a failed asset in a failed run. This means that when we determine which assets are failed, we need to cross check with the assets that have been successfully materialized and remove any that are in the materialized list ## How I Tested These Changes new unit tests manual test ```python @dg.asset( partitions_def=daily_partitions_def, ) def has_retry_policy(context): if context.run.parent_run_id is None: raise Exception("I failed") else: return 1 @dg.asset(partitions_def=weekly_partitions_def) def another_weekly(context, has_retry_policy): return 1 ``` run a backfill of at least 1 week. Can watch the backfill progress and see that once all of the daily assets retry and succeed, the weekly asset will get kicked off. This contrasts with the behavior in the PR down the stack where the daily assets would retry and succeed, but the weekly asset would never get kicked off ## Changelog Manual and automatic retries of runs launched by backfills that occur while the backfill is still in progress are now incorporated into the backfill's status
Summary & Motivation
The backfill daemon doesn't account for run retries. See https://github.com/dagster-io/internal/discussions/12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration.
This PR addresses the second point, updating the backfill daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration.
Currently the backfill gets a list of the successfully materialized assets for the backfill by looking at the materialization events for the asset. It determines which assets failed by looking at the failed runs launched by the backfill and pulling the asset partition information from those runs. Any assets downstream of those failed assets will not be launched by the backfill
Now that we want the backfill daemon to account for run retries we need to slightly modify this logic. Since a run can be retried it is possible that an asset can have a successful materialization AND be a failed asset in a failed run. This means that when we determine which assets are failed, we need to cross check with the assets that have been successfully materialized and remove any that are in the materialized list
How I Tested These Changes
new unit tests
manual test
run a backfill of at least 1 week. Can watch the backfill progress and see that once all of the daily assets retry and succeed, the weekly asset will get kicked off. This contrasts with the behavior in the PR down the stack where the daily assets would retry and succeed, but the weekly asset would never get kicked off
Changelog
Manual and automatic retries of runs launched by backfills that occur while the backfill is still in progress are now incorporated into the backfill's status