Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backfill daemon run retries 2/n] backfill daemon incorporates retries runs when launching new runs #25853

Merged
merged 8 commits into from
Dec 5, 2024

Conversation

jamiedemaria
Copy link
Contributor

@jamiedemaria jamiedemaria commented Nov 11, 2024

Summary & Motivation

The backfill daemon doesn't account for run retries. See https://github.com/dagster-io/internal/discussions/12460 for more context. We've decided that we want the daemon to account for automatic and manual retries of runs that occur while the backfill is still in progress. This requires two changes: ensuring the backfill isn't marked completed if there is an in progress run or a failed run that will be automatically retried; and updating the daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration.

This PR addresses the second point, updating the backfill daemon to take the results of retried runs into account when deciding what partitions to materialize in the next iteration.

Currently the backfill gets a list of the successfully materialized assets for the backfill by looking at the materialization events for the asset. It determines which assets failed by looking at the failed runs launched by the backfill and pulling the asset partition information from those runs. Any assets downstream of those failed assets will not be launched by the backfill

Now that we want the backfill daemon to account for run retries we need to slightly modify this logic. Since a run can be retried it is possible that an asset can have a successful materialization AND be a failed asset in a failed run. This means that when we determine which assets are failed, we need to cross check with the assets that have been successfully materialized and remove any that are in the materialized list

How I Tested These Changes

new unit tests

manual test

@dg.asset(
    partitions_def=daily_partitions_def,
)
def has_retry_policy(context):
    if context.run.parent_run_id is None:
        raise Exception("I failed")
    else:
        return 1

@dg.asset(partitions_def=weekly_partitions_def)
def another_weekly(context, has_retry_policy):
    return 1

run a backfill of at least 1 week. Can watch the backfill progress and see that once all of the daily assets retry and succeed, the weekly asset will get kicked off. This contrasts with the behavior in the PR down the stack where the daily assets would retry and succeed, but the weekly asset would never get kicked off

Changelog

Manual and automatic retries of runs launched by backfills that occur while the backfill is still in progress are now incorporated into the backfill's status

@jamiedemaria jamiedemaria changed the title backfill daemon incorporates retries runs backfill daemon incorporates retries runs when launching new runs Nov 11, 2024
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 0493541 to bf580e4 Compare November 11, 2024 20:52
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch 3 times, most recently from 8a2005d to 6ce3f62 Compare November 11, 2024 21:54
Comment on lines 1825 to 1844
for asset_key in failed_asset_keys:
result.extend(
asset_graph.get_partitions_in_range(
asset_key, partition_range, instance_queryer
)
asset_partition_candidates = asset_graph.get_partitions_in_range(
asset_key, partition_range, instance_queryer
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a scoping issue with asset_partition_candidates. The code creates these candidates for partition ranges but never uses them because the result.extend(asset_partitions_still_failed) call is outside both branches of the if/else. To fix this:

  1. Move the asset_partition_candidates assignment inside the for loop in the first branch
  2. Move result.extend(asset_partitions_still_failed) inside each branch of the if/else

This ensures both partition ranges and single partitions are properly filtered against the materialized subset before being added to the result.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 6ce3f62 to 721e0a4 Compare November 12, 2024 15:24
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from cd3cc4a to 1fb8fc2 Compare November 12, 2024 15:48
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 721e0a4 to 514d595 Compare November 12, 2024 15:48
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 1fb8fc2 to 436cffd Compare November 12, 2024 18:57
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from eb96c61 to a691a45 Compare November 12, 2024 18:57
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 436cffd to 6dbf15a Compare November 12, 2024 20:41
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch 2 times, most recently from c93546d to 5b52878 Compare November 12, 2024 20:44
asset_graph,
)
updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
failed_and_downstream_subset=failed_subset,
Copy link
Contributor Author

@jamiedemaria jamiedemaria Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to confirm that this is a safe change to make. From my reading of _get_failed_asset_partitions it returns the full list of failed partitions, not a list of partitions that failed since the last tick, so in the version of this code before this PR the ORing of the two subsets is a no-op since asset_backfill_data.failed_and_downstream_subset would be a subset of failed_subset

with the change to have _get_failed_asset_partitions account for retries, ORing with asset_backfill_data.failed_and_downstream_subset would result in inaccurate data because a failed partition in asset_backfill_data.failed_and_downstream_subset could have been successfully retried and no longer in failed_subset but would still be included because of the OR operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok i forgot that we do the second function to get the downstream of the failed subset, so this isn't a simple replacement since failed_subset doesn't include downstream assets. i will update

@jamiedemaria jamiedemaria marked this pull request as ready for review November 12, 2024 20:49
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 6dbf15a to 6f60763 Compare November 13, 2024 16:03
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 389f553 to 46e48b3 Compare November 13, 2024 16:03
@jamiedemaria jamiedemaria changed the title backfill daemon incorporates retries runs when launching new runs [backfill daemon run retries 2/n] backfill daemon incorporates retries runs when launching new runs Nov 13, 2024
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 46e48b3 to eade146 Compare November 13, 2024 16:40
Copy link
Member

@gibsondan gibsondan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @clairelin135 is more familiar about the ins and outs of this particular codepath than I, I do have one question though

updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
failed_and_downstream_subset=updated_failed_subset,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not super familiar with this code but how is the 'and downstream' part of 'failed_and_downstrema_subset' affected by the retry case? If we add downstream assets as well to the failed list as soon as we detect a partition fails, do we need to remove them if that run is retried? Or are those added at the very end after we have decided that the backfill is totally done and failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this specifically is for the case when the backfill has been canceled.

In the normal code path what we do is:

  • find which partitions failed
  • do a BFS search to find all downstream partitions of the failed partitions
  • set that as the failed_and_downstream_subset
  • when determining if we can launch a run for a partition, if it is in the failed_and_downstream_subset we know we cannot launch the run

we run this process from scratch on each backfill iteration, so we can handle run retries in the first step by checking if each failed partition has been retried successfully, and if so we remove it from the list of failed partitions. then all the following steps will produce the correct results

for this cancling case it's a bit different. the way it works now is that we take the previous failed_and_downstream_subset and just add any partitions that have failed (effectively this just adds the partitions that failed since the last tick). we don't recalculate the downstream subset for any newly failed partitions (i'm not sure why, maybe @clairelin135 knows).

If we leave the code as is, we could have a case where a failed partition was retried and that retry succeeded since the previous tick. the partition would be in the failed_and_downstream_subset from the previous tick, but is now successful. we want to make sure the right status is reported, so we need to remove it from the final set of failed partitions. and we can do that be removing the materialized_subset from the failed_and_downstream_subset

Another option would be to do the process we do in a normal tick and generate the failed_and_downstream_subset from scratch rather than do this additive approach. Not sure why that wasn't done in the first place though, so i didn't make that change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't recalculate the downstream subset for any newly failed partitions

I think we added this for performance reasons, if a backfill is canceling we do the cheapest thing possible which is to mark every requested partition as failed. We don't bother doing the graph traversal to mark their downstreams as failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here in the canceling iteration look good to me

Copy link
Contributor

@clairelin135 clairelin135 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here look good to me.

Initially I was wondering if we could avoid moving a partition to the failed subset until all of its retries have concluded, to simplify the state transitions here. But since we're incorporating manual retries we need to be able to move partitions and their downstreams between the failed / materialized subsets regardless.

updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
failed_and_downstream_subset=updated_failed_subset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't recalculate the downstream subset for any newly failed partitions

I think we added this for performance reasons, if a backfill is canceling we do the cheapest thing possible which is to mark every requested partition as failed. We don't bother doing the graph traversal to mark their downstreams as failed.

updated_backfill_data = AssetBackfillData(
target_subset=asset_backfill_data.target_subset,
latest_storage_id=asset_backfill_data.latest_storage_id,
requested_runs_for_target_roots=asset_backfill_data.requested_runs_for_target_roots,
materialized_subset=updated_materialized_subset,
failed_and_downstream_subset=asset_backfill_data.failed_and_downstream_subset
| failed_subset,
failed_and_downstream_subset=updated_failed_subset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here in the canceling iteration look good to me

@@ -1274,16 +1274,29 @@ def get_canceling_asset_backfill_iteration_data(
)

failed_subset = AssetGraphSubset.from_asset_partition_set(
set(_get_failed_asset_partitions(instance_queryer, backfill_id, asset_graph)),
set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure that if the backfill is in a canceling state, retried runs are not included in the backfill

@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 6f60763 to 41d95a9 Compare November 25, 2024 18:52
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from eade146 to 7408d71 Compare November 25, 2024 18:52
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 19c220c to e7ea44d Compare November 27, 2024 20:19
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from ad1c316 to de67c1d Compare November 27, 2024 20:19
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from e7ea44d to f9c706b Compare November 27, 2024 20:58
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from de67c1d to 6cb78c1 Compare November 27, 2024 20:58
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from f9c706b to 7c44e69 Compare November 27, 2024 21:45
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 6cb78c1 to a89be73 Compare November 27, 2024 21:45
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 7c44e69 to 973062b Compare December 2, 2024 14:49
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from a89be73 to f6c77ca Compare December 2, 2024 14:49
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 973062b to f810d43 Compare December 3, 2024 17:34
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch 2 times, most recently from eea3924 to 5126053 Compare December 4, 2024 17:10
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-termination-change branch from 3c089a8 to 701562d Compare December 5, 2024 17:15
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 5126053 to 4ce8b5a Compare December 5, 2024 17:15
Base automatically changed from jamie/backfill-daemon-termination-change to master December 5, 2024 18:06
@jamiedemaria jamiedemaria force-pushed the jamie/backfill-daemon-accounts-for-retries branch from 4ce8b5a to d6f6797 Compare December 5, 2024 18:07
@jamiedemaria jamiedemaria merged commit 4a0ae0a into master Dec 5, 2024
1 check failed
@jamiedemaria jamiedemaria deleted the jamie/backfill-daemon-accounts-for-retries branch December 5, 2024 18:36
cmpadden pushed a commit that referenced this pull request Dec 5, 2024
…s runs when launching new runs (#25853)

## Summary & Motivation
The backfill daemon doesn't account for run retries. See
dagster-io/internal#12460 for more
context. We've decided that we want the daemon to account for automatic
and manual retries of runs that occur while the backfill is still in
progress. This requires two changes: ensuring the backfill isn't marked
completed if there is an in progress run or a failed run that will be
automatically retried; and updating the daemon to take the results of
retried runs into account when deciding what partitions to materialize
in the next iteration.

This PR addresses the second point, updating the backfill daemon to take
the results of retried runs into account when deciding what partitions
to materialize in the next iteration.

Currently the backfill gets a list of the successfully materialized
assets for the backfill by looking at the materialization events for the
asset. It determines which assets failed by looking at the failed runs
launched by the backfill and pulling the asset partition information
from those runs. Any assets downstream of those failed assets will not
be launched by the backfill

Now that we want the backfill daemon to account for run retries we need
to slightly modify this logic. Since a run can be retried it is possible
that an asset can have a successful materialization AND be a failed
asset in a failed run. This means that when we determine which assets
are failed, we need to cross check with the assets that have been
successfully materialized and remove any that are in the materialized
list

## How I Tested These Changes
new unit tests 

manual test 

```python
@dg.asset(
    partitions_def=daily_partitions_def,
)
def has_retry_policy(context):
    if context.run.parent_run_id is None:
        raise Exception("I failed")
    else:
        return 1

@dg.asset(partitions_def=weekly_partitions_def)
def another_weekly(context, has_retry_policy):
    return 1
```

run a backfill of at least 1 week. Can watch the backfill progress and
see that once all of the daily assets retry and succeed, the weekly
asset will get kicked off. This contrasts with the behavior in the PR
down the stack where the daily assets would retry and succeed, but the
weekly asset would never get kicked off

## Changelog

Manual and automatic retries of runs launched by backfills that occur
while the backfill is still in progress are now incorporated into the
backfill's status
pskinnerthyme pushed a commit to pskinnerthyme/dagster that referenced this pull request Dec 16, 2024
…s runs when launching new runs (dagster-io#25853)

## Summary & Motivation
The backfill daemon doesn't account for run retries. See
https://github.com/dagster-io/internal/discussions/12460 for more
context. We've decided that we want the daemon to account for automatic
and manual retries of runs that occur while the backfill is still in
progress. This requires two changes: ensuring the backfill isn't marked
completed if there is an in progress run or a failed run that will be
automatically retried; and updating the daemon to take the results of
retried runs into account when deciding what partitions to materialize
in the next iteration.

This PR addresses the second point, updating the backfill daemon to take
the results of retried runs into account when deciding what partitions
to materialize in the next iteration.

Currently the backfill gets a list of the successfully materialized
assets for the backfill by looking at the materialization events for the
asset. It determines which assets failed by looking at the failed runs
launched by the backfill and pulling the asset partition information
from those runs. Any assets downstream of those failed assets will not
be launched by the backfill

Now that we want the backfill daemon to account for run retries we need
to slightly modify this logic. Since a run can be retried it is possible
that an asset can have a successful materialization AND be a failed
asset in a failed run. This means that when we determine which assets
are failed, we need to cross check with the assets that have been
successfully materialized and remove any that are in the materialized
list

## How I Tested These Changes
new unit tests 

manual test 

```python
@dg.asset(
    partitions_def=daily_partitions_def,
)
def has_retry_policy(context):
    if context.run.parent_run_id is None:
        raise Exception("I failed")
    else:
        return 1

@dg.asset(partitions_def=weekly_partitions_def)
def another_weekly(context, has_retry_policy):
    return 1
```

run a backfill of at least 1 week. Can watch the backfill progress and
see that once all of the daily assets retry and succeed, the weekly
asset will get kicked off. This contrasts with the behavior in the PR
down the stack where the daily assets would retry and succeed, but the
weekly asset would never get kicked off

## Changelog

Manual and automatic retries of runs launched by backfills that occur
while the backfill is still in progress are now incorporated into the
backfill's status
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants