From 0d4ead61c417a2ccdc920afb90f5215504a2a4ad Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 9 Oct 2024 15:02:36 -0400 Subject: [PATCH] add delete backfill to storage --- .../dagster/_core/instance/__init__.py | 3 + .../dagster/_core/storage/legacy_storage.py | 3 + .../dagster/_core/storage/runs/base.py | 4 ++ .../_core/storage/runs/sql_run_storage.py | 6 ++ .../storage_tests/utils/run_storage.py | 59 +++++++++++++++++++ 5 files changed, 75 insertions(+) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index aff4da0c78b53..06985bea0f6fc 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -3108,6 +3108,9 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None: def update_backfill(self, partition_backfill: "PartitionBackfill") -> None: self._run_storage.update_backfill(partition_backfill) + def delete_backfill(self, backfill_id: str) -> None: + self._run_storage.delete_backfill(backfill_id) + @property def should_start_background_run_thread(self) -> bool: """Gate on an experimental feature to start a thread that monitors for if the run should be canceled.""" diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index 268f2aef499a0..9272ce2acbc1c 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -332,6 +332,9 @@ def add_backfill(self, partition_backfill: "PartitionBackfill") -> None: def update_backfill(self, partition_backfill: "PartitionBackfill") -> None: return self._storage.run_storage.update_backfill(partition_backfill) + def delete_backfill(self, backfill_id: str) -> None: + return self._storage.run_storage.delete_backfill(backfill_id) + def get_run_partition_data(self, runs_filter: "RunsFilter") -> Sequence["RunPartitionData"]: return self._storage.run_storage.get_run_partition_data(runs_filter) diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index dc993fe92ec8b..08263b4b38c3a 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -403,6 +403,10 @@ def add_backfill(self, partition_backfill: PartitionBackfill): def update_backfill(self, partition_backfill: PartitionBackfill): """Update a partition backfill in run storage.""" + @abstractmethod + def delete_backfill(self, backfill_id: str) -> None: + """Delete a backfill from run storage.""" + def alembic_version(self) -> Optional[AlembicVersion]: return None diff --git a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py index 29b7515a6d0d7..f9f66d31eb689 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py +++ b/python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py @@ -1017,6 +1017,12 @@ def update_backfill(self, partition_backfill: PartitionBackfill) -> None: ) ) + def delete_backfill(self, backfill_id: str) -> None: + check.str_param(backfill_id, "backfill_id") + query = db.delete(BulkActionsTable).where(BulkActionsTable.c.key == backfill_id) + with self.connect() as conn: + conn.execute(query) + def get_cursor_values(self, keys: Set[str]) -> Mapping[str, str]: check.set_param(keys, "keys", of_type=str) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 22eec093f6f71..e02825dc0569b 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1735,6 +1735,65 @@ def test_backfill_id_filtering(self, storage: RunStorage): ) assert backfills_for_id[0].backfill_id == backfill.backfill_id + def test_delete_backfill(self, storage: RunStorage): + origin = self.fake_partition_set_origin("fake_partition_set") + backfills = storage.get_backfills() + assert len(backfills) == 0 + + one = PartitionBackfill( + "one", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + + storage.add_backfill(one) + + two = PartitionBackfill( + "two", + partition_set_origin=origin, + status=BulkActionStatus.REQUESTED, + partition_names=["a", "b", "c"], + from_failure=False, + tags={}, + backfill_timestamp=time.time(), + ) + storage.add_backfill(two) + for _ in range(3): + storage.add_run( + TestRunStorage.build_run( + run_id=make_new_run_id(), + job_name="some_pipeline", + status=DagsterRunStatus.SUCCESS, + tags={BACKFILL_ID_TAG: two.backfill_id}, + ) + ) + + storage.add_run( + TestRunStorage.build_run( + run_id=make_new_run_id(), + job_name="some_pipeline", + status=DagsterRunStatus.SUCCESS, + tags={}, + ) + ) + + storage.delete_backfill("one") + assert storage.get_backfill("one") is None + + res = storage.get_backfill("two") + assert res + assert res.backfill_id == "two" + + assert len(storage.get_runs()) == 4 + storage.delete_backfill("two") + assert storage.get_backfill("two") is None + # deleting a backfill does not delete the runs that are part of the backfill + assert len(storage.get_runs()) == 4 + def test_secondary_index(self, storage): self._skip_in_memory(storage)