diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 67c024be05647..0b81bfd70fc42 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -1025,6 +1025,9 @@ def update_backfill(self, partition_backfill: PartitionBackfill) -> None: def delete_backfill(self, backfill_id: str) -> None: check.str_param(backfill_id, "backfill_id") + runs_in_backfill = self.get_run_ids(filters=RunsFilter.for_backfill(backfill_id)) + for run_id in runs_in_backfill: + self.delete_run(run_id) query = db.delete(BulkActionsTable).where(BulkActionsTable.c.key == backfill_id) with self.connect() as conn: conn.execute(query) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 71b7d7e69ab98..6c0696c66232b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1808,12 +1808,34 @@ def test_delete_backfill(self, storage: RunStorage): backfill_timestamp=time.time(), ) storage.add_backfill(two) + storage.add_run( + TestRunStorage.build_run( + run_id=make_new_run_id(), + job_name="some_pipeline", + status=DagsterRunStatus.SUCCESS, + tags={BACKFILL_ID_TAG: two.backfill_id}, + ) + ) + + storage.add_run( + TestRunStorage.build_run( + run_id=make_new_run_id(), + job_name="some_pipeline", + status=DagsterRunStatus.SUCCESS, + tags={}, + ) + ) storage.delete_backfill("one") assert storage.get_backfill("one") is None assert storage.get_backfill("two").backfill_id == "two" + assert len(storage.get_runs()) == 2 + storage.delete_backfill("two") + assert storage.get_backfill("two") is None + assert len(storage.get_runs()) == 1 + def test_secondary_index(self, storage): self._skip_in_memory(storage)