|
53 | 53 | get_asset_backfill_run_chunk_size,
|
54 | 54 | )
|
55 | 55 | from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
|
| 56 | +from dagster._core.execution.plan.resume_retry import ReexecutionStrategy |
56 | 57 | from dagster._core.remote_representation import (
|
57 | 58 | InProcessCodeLocationOrigin,
|
58 | 59 | RemoteRepository,
|
59 | 60 | RemoteRepositoryOrigin,
|
60 | 61 | )
|
| 62 | +from dagster._core.remote_representation.code_location import CodeLocation |
61 | 63 | from dagster._core.storage.compute_log_manager import ComputeIOType
|
62 | 64 | from dagster._core.storage.dagster_run import (
|
63 | 65 | IN_PROGRESS_RUN_STATUSES,
|
|
69 | 71 | ASSET_PARTITION_RANGE_END_TAG,
|
70 | 72 | ASSET_PARTITION_RANGE_START_TAG,
|
71 | 73 | BACKFILL_ID_TAG,
|
| 74 | + BACKFILL_TAGS, |
72 | 75 | MAX_RETRIES_TAG,
|
73 | 76 | PARTITION_NAME_TAG,
|
74 | 77 | )
|
@@ -3169,9 +3172,9 @@ def test_asset_backfill_retries_make_downstreams_runnable(
|
3169 | 3172 | def test_run_retry_not_part_of_completed_backfill(
|
3170 | 3173 | instance: DagsterInstance,
|
3171 | 3174 | workspace_context: WorkspaceProcessContext,
|
| 3175 | + code_location: CodeLocation, |
3172 | 3176 | remote_repo: RemoteRepository,
|
3173 | 3177 | ):
|
3174 |
| - del remote_repo |
3175 | 3178 | backfill_id = "run_retries_backfill"
|
3176 | 3179 | partition_keys = static_partitions.get_partition_keys()
|
3177 | 3180 | asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")]
|
@@ -3215,47 +3218,29 @@ def test_run_retry_not_part_of_completed_backfill(
|
3215 | 3218 | assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
|
3216 | 3219 |
|
3217 | 3220 | # manual retry of a run
|
3218 |
| - instance.create_reexecuted_run() |
3219 | 3221 |
|
3220 | 3222 | # simulate a retry of a run
|
3221 | 3223 | run_to_retry = instance.get_runs()[0]
|
3222 |
| - retried_run = create_run_for_test( |
3223 |
| - instance=instance, |
3224 |
| - job_name=run_to_retry.job_name, |
3225 |
| - tags=run_to_retry.tags, |
3226 |
| - root_run_id=run_to_retry.run_id, |
3227 |
| - parent_run_id=run_to_retry.run_id, |
| 3224 | + remote_job = remote_repo.get_full_job(run_to_retry.job_name) |
| 3225 | + retried_run = instance.create_reexecuted_run( |
| 3226 | + parent_run=run_to_retry, |
| 3227 | + code_location=code_location, |
| 3228 | + remote_job=remote_job, |
| 3229 | + strategy=ReexecutionStrategy.ALL_STEPS, |
3228 | 3230 | )
|
3229 | 3231 |
|
3230 |
| - # since there is a run in progress, the backfill should not be marked as complete, even though |
3231 |
| - # all targeted asset partitions have a completed state |
| 3232 | + for tag in BACKFILL_TAGS: |
| 3233 | + assert tag not in retried_run.tags.keys() |
| 3234 | + |
| 3235 | + # Since the backfill is alerady complete, it should not be processed by the backfill daemon and |
| 3236 | + # should remain in a completed state |
3232 | 3237 | list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
|
3233 | 3238 | backfill = instance.get_backfill(backfill_id)
|
3234 | 3239 | assert backfill
|
3235 |
| - assert backfill.asset_backfill_data |
3236 |
| - assert backfill.asset_backfill_data.all_targeted_partitions_have_materialization_status() |
3237 | 3240 | assert backfill.status == BulkActionStatus.REQUESTED
|
3238 | 3241 |
|
3239 |
| - # manually mark the run as successful to show that the backfill will be marked as complete |
3240 |
| - # since there are no in progress runs |
3241 |
| - instance.handle_new_event( |
3242 |
| - EventLogEntry( |
3243 |
| - error_info=None, |
3244 |
| - level="debug", |
3245 |
| - user_message="", |
3246 |
| - run_id=retried_run.run_id, |
3247 |
| - timestamp=time.time(), |
3248 |
| - dagster_event=DagsterEvent( |
3249 |
| - event_type_value=DagsterEventType.RUN_SUCCESS.value, |
3250 |
| - job_name=retried_run.job_name, |
3251 |
| - ), |
3252 |
| - ) |
3253 |
| - ) |
3254 |
| - |
3255 |
| - retried_run = instance.get_runs(filters=RunsFilter(run_ids=[retried_run.run_id]))[0] |
3256 |
| - assert retried_run.status == DagsterRunStatus.SUCCESS |
| 3242 | + wait_for_all_runs_to_finish(instance, timeout=30) |
3257 | 3243 |
|
3258 |
| - list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) |
3259 |
| - backfill = instance.get_backfill(backfill_id) |
3260 |
| - assert backfill |
3261 |
| - assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS |
| 3244 | + assert retried_run.run_id not in [ |
| 3245 | + r.run_id for r in instance.get_runs(filters=RunsFilter.for_backfill(backfill_id)) |
| 3246 | + ] |
0 commit comments