From b5ac4d7206e8aaf97e9b050ce86d9825e4645aff Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 9 Oct 2024 16:45:38 -0400 Subject: [PATCH] endpoint --- .../dagster/_core/storage/legacy_storage.py | 3 ++ .../dagster/_core/storage/runs/base.py | 4 ++ .../storage_tests/utils/run_storage.py | 46 +++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index 268f2aef499a0..6214c1c376b0f 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -288,6 +288,9 @@ def wipe(self) -> None: def delete_run(self, run_id: str) -> None: return self._storage.run_storage.delete_run(run_id) + def delete_runs(self, run_ids: Sequence[str]) -> None: + return self._storage.run_storage.delete_runs(run_ids) + def migrate(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: return self._storage.run_storage.migrate(print_fn, force_rebuild_all) diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index dc993fe92ec8b..8becb532bafb2 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -318,6 +318,10 @@ def wipe(self) -> None: def delete_run(self, run_id: str) -> None: """Remove a run from storage.""" + @abstractmethod + def delete_runs(self, run_ids: Sequence[str]) -> None: + """Remove a list of runs from storage.""" + @property def supports_bucket_queries(self) -> bool: return False diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index fe90e70b4de24..fae89cff81982 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1032,6 +1032,24 @@ def test_delete(self, storage): storage.delete_run(run_id) assert list(storage.get_runs()) == [] + def test_delete_multiple(self, storage): + if not self.can_delete_runs(): + pytest.skip("storage cannot delete runs") + + assert storage + run_ids = [] + for _ in range(3): + run_id = make_new_run_id() + storage.add_run(TestRunStorage.build_run(run_id=run_id, job_name="some_pipeline")) + run_ids.append(run_id) + + storage.add_run( + TestRunStorage.build_run(run_id=make_new_run_id(), job_name="some_pipeline") + ) + assert len(storage.get_runs()) == 4 + storage.delete_runs(run_ids) + assert len(storage.get_runs()) == 1 + def test_delete_with_tags(self, storage: RunStorage): if not self.can_delete_runs(): pytest.skip("storage cannot delete runs") @@ -1051,6 +1069,34 @@ def test_delete_with_tags(self, storage: RunStorage): assert list(storage.get_runs()) == [] assert run_id not in [key for key, value in storage.get_run_tags(tag_keys=[run_id])] + def test_delete_multiple_with_tags(self, storage: RunStorage): + if not self.can_delete_runs(): + pytest.skip("storage cannot delete runs") + + assert storage + run_ids = [] + for _ in range(3): + run_id = make_new_run_id() + storage.add_run( + TestRunStorage.build_run( + run_id=run_id, + job_name="some_pipeline", + tags={run_id: run_id}, + ) + ) + run_ids.append(run_id) + + storage.add_run( + TestRunStorage.build_run( + run_id=make_new_run_id(), + job_name="some_pipeline", + tags={"not_deleted": "true"}, + ) + ) + assert len(storage.get_runs()) == 4 + storage.delete_runs(run_ids) + assert len(storage.get_runs()) == 1 + def test_wipe_tags(self, storage: RunStorage): if not self.can_delete_runs(): pytest.skip("storage cannot delete")