diff --git a/darwin/cli.py b/darwin/cli.py index 003f20802..50b628c02 100644 --- a/darwin/cli.py +++ b/darwin/cli.py @@ -155,6 +155,9 @@ def _run(args: Namespace, parser: ArgumentParser) -> None: args.video_frames, args.force_slots, args.ignore_slots, + args.retry, + args.retry_timeout, + args.retry_interval, ) elif args.action == "import": f.dataset_import( diff --git a/darwin/cli_functions.py b/darwin/cli_functions.py index efde7456f..395c7c829 100644 --- a/darwin/cli_functions.py +++ b/darwin/cli_functions.py @@ -410,6 +410,9 @@ def pull_dataset( video_frames: bool = False, force_slots: bool = False, ignore_slots: bool = False, + retry: bool = False, + retry_timeout: int = 600, + retry_interval: int = 10, ) -> None: """ Downloads a remote dataset (images and annotations) in the datasets directory. @@ -428,6 +431,12 @@ def pull_dataset( Pulls video frames images instead of video files. Defaults to False. force_slots: bool Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name}) + retry: bool + If True, will repeatedly try to download the release if it is still processing until the timeout is reached. + retry_timeout: int + If retrying, total time to wait for the release to be ready for download + retry_interval: int + If retrying, time to wait between retries of checking if the release is ready for download. """ version: str = DatasetIdentifier.parse(dataset_slug).version or "latest" client: Client = _load_client(offline=False, maybe_guest=True) @@ -444,7 +453,7 @@ def pull_dataset( _error("please re-authenticate") try: - release: Release = dataset.get_release(version) + release: Release = dataset.get_release(version, retry) dataset.pull( release=release, only_annotations=only_annotations, @@ -452,11 +461,14 @@ def pull_dataset( video_frames=video_frames, force_slots=force_slots, ignore_slots=ignore_slots, + retry=retry, + retry_timeout=retry_timeout, + retry_interval=retry_interval, ) print_new_version_info(client) except NotFound: _error( - f"Version '{dataset.identifier}:{version}' does not exist " + f"Version '{dataset.identifier}:{version}' does not exist. " f"Use 'darwin dataset releases' to list all available versions." ) except UnsupportedExportFormat as uef: diff --git a/darwin/dataset/release.py b/darwin/dataset/release.py index a8ce9e92d..54e32fbb7 100644 --- a/darwin/dataset/release.py +++ b/darwin/dataset/release.py @@ -1,12 +1,20 @@ import datetime import shutil +from enum import Enum from pathlib import Path from typing import Any, Dict, Optional import requests + from darwin.dataset.identifier import DatasetIdentifier +class ReleaseStatus(Enum): + PENDING = "pending" + COMPLETE = "complete" + FAILED = "failed" + + class Release: """ Represents a release/export. Releases created this way can only contain items with 'completed' @@ -22,6 +30,8 @@ class Release: The version of the ``Release``. name : str The name of the ``Release``. + status : ReleaseStatus + The status of the ``Release``. url : Optional[str] The full url used to download the ``Release``. export_date : datetime.datetime @@ -47,6 +57,8 @@ class Release: The version of the ``Release``. name : str The name of the ``Release``. + status : ReleaseStatus + The status of the ``Release``. url : Optional[str] The full url used to download the ``Release``. export_date : datetime.datetime @@ -69,6 +81,7 @@ def __init__( team_slug: str, version: str, name: str, + status: ReleaseStatus, url: Optional[str], export_date: datetime.datetime, image_count: Optional[int], @@ -81,6 +94,7 @@ def __init__( self.team_slug = team_slug self.version = version self.name = name + self.status = ReleaseStatus(status) self.url = url self.export_date = export_date self.image_count = image_count @@ -155,6 +169,7 @@ def parse_json( team_slug=team_slug, version=payload["version"], name=payload["name"], + status=payload["status"], export_date=export_date, url=None, available=False, @@ -169,6 +184,7 @@ def parse_json( team_slug=team_slug, version=payload["version"], name=payload["name"], + status=payload["status"], image_count=payload["metadata"]["num_images"], class_count=len(payload["metadata"]["annotation_classes"]), export_date=export_date, diff --git a/darwin/dataset/remote_dataset.py b/darwin/dataset/remote_dataset.py index da517ae43..c96ef194b 100644 --- a/darwin/dataset/remote_dataset.py +++ b/darwin/dataset/remote_dataset.py @@ -1,6 +1,7 @@ import os import shutil import tempfile +import time import zipfile from datetime import datetime from pathlib import Path @@ -207,6 +208,9 @@ def pull( video_frames: bool = False, force_slots: bool = False, ignore_slots: bool = False, + retry: bool = False, + retry_timeout: int = 600, + retry_interval: int = 10, ) -> Tuple[Optional[Callable[[], Iterator[Any]]], int]: """ Downloads a remote dataset (images and annotations) to the datasets directory. @@ -237,6 +241,8 @@ def pull( Pulls video frames images instead of video files. force_slots: bool Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name}) + retry: bool + If True, will repeatedly try to download the release if it is still processing up to a maximum of 5 minutes. Returns ------- @@ -251,16 +257,44 @@ def pull( If the given ``release`` has an invalid format. ValueError If darwin in unable to get ``Team`` configuration. + ValueError + If the release is still processing after the maximum retry duration. """ console = self.console or Console() + if retry and retry_timeout < retry_interval: + raise ValueError( + f"The value of retry_timeout '{retry_timeout}' must be greater than or equal to the value of retry_interval '{retry_interval}'." + ) + if release is None: - release = self.get_release() + release = self.get_release(include_unavailable=retry) if release.format != "json" and release.format != "darwin_json_2": raise UnsupportedExportFormat(release.format) + if release.status.value == "pending": + if retry: + while release.status.value == "pending" and retry_timeout > 0: + console.print( + f"Release '{release.name}' for dataset '{self.name}' is still processing. Retrying in {retry_interval} seconds... {retry_timeout} seconds left before timeout." + ) + time.sleep(retry_interval) + retry_timeout -= retry_interval + release = self.get_release(release.name, include_unavailable=retry) + if release.status.value == "pending": + raise ValueError( + f"Release {release.name} for dataset '{self.name}' is still processing. Please try again later." + ) + else: + raise ValueError( + f"Release '{release.name}' for dataset '{self.name}' is still processing. Please wait for it to be ready.\n\n If you would like to automatically retry, set the `retry` parameter to `True` with the SDK, or use the `--retry` flag with the CLI." + ) + console.print( + f"Release '{release.name}' for dataset '{self.name}' is ready for download. Starting download..." + ) + release_dir = self.local_releases_path / release.name release_dir.mkdir(parents=True, exist_ok=True) @@ -715,17 +749,24 @@ def get_report(self, granularity: str = "day") -> str: """ @abstractmethod - def get_releases(self) -> List["Release"]: + def get_releases(self, include_unavailable: bool = False) -> List["Release"]: """ Get a sorted list of releases with the most recent first. + Parameters + ---------- + include_unavailable : bool, default: False + If True, return all releases, including those that are not available. + Returns ------- List["Release"] Returns a sorted list of available ``Release``\\s with the most recent first. """ - def get_release(self, name: str = "latest") -> "Release": + def get_release( + self, name: str = "latest", include_unavailable: bool = True + ) -> "Release": """ Get a specific ``Release`` for this ``RemoteDataset``. @@ -733,6 +774,8 @@ def get_release(self, name: str = "latest") -> "Release": ---------- name : str, default: "latest" Name of the export. + include_unavailable : bool, default: True + If True, return all releases, including those that are not available. Returns ------- @@ -744,22 +787,30 @@ def get_release(self, name: str = "latest") -> "Release": NotFound The selected ``Release`` does not exist. """ - releases = self.get_releases() + releases = self.get_releases(include_unavailable=include_unavailable) if not releases: - raise NotFound(str(self.identifier)) + raise NotFound( + str( + f"No releases found for dataset '{self.name}'. Please create an export of this dataset first." + ) + ) # overwrite default name with stored dataset.release if supplied if self.release and name == "latest": name = self.release elif name == "latest": - return next((release for release in releases if release.latest)) + return ( + sorted(releases, key=lambda x: x.export_date, reverse=True)[0] + if include_unavailable + else next((release for release in releases if release.latest)) + ) for release in releases: if str(release.name) == name: return release raise NotFound( str( - f"Release name {name} not found in dataset {self.name}. Please check this release exists for this dataset." + f"Release name '{name}' not found in dataset '{self.name}'. Please check this release exists for this dataset." ) ) diff --git a/darwin/dataset/remote_dataset_v2.py b/darwin/dataset/remote_dataset_v2.py index 42607a85b..a8764e31b 100644 --- a/darwin/dataset/remote_dataset_v2.py +++ b/darwin/dataset/remote_dataset_v2.py @@ -115,10 +115,15 @@ def __init__( version=2, ) - def get_releases(self) -> List["Release"]: + def get_releases(self, include_unavailable: bool = False) -> List["Release"]: """ Get a sorted list of releases with the most recent first. + Parameters + ---------- + include_unavailable : bool, default: False + If True, return all releases, including those that are not available. + Returns ------- List["Release"] @@ -135,8 +140,13 @@ def get_releases(self) -> List["Release"]: Release.parse_json(self.slug, self.team, payload) for payload in releases_json ] + return sorted( - filter(lambda x: x.available, releases), + ( + releases + if include_unavailable + else filter(lambda x: x.available, releases) + ), key=lambda x: x.version, reverse=True, ) diff --git a/darwin/options.py b/darwin/options.py index db837bb7f..4efa8599c 100644 --- a/darwin/options.py +++ b/darwin/options.py @@ -277,6 +277,24 @@ def __init__(self) -> None: action="store_true", help="Pulls video frame images instead of video files.", ) + parser_pull.add_argument( + "--retry", + action="store_true", + default=False, + help="Repeatedly try to download the release if it is still processing.", + ) + parser_pull.add_argument( + "--retry-timeout", + type=int, + default=600, + help="Total time to wait for the release to be ready for download.", + ) + parser_pull.add_argument( + "--retry-interval", + type=int, + default=10, + help="Time to wait between retries of checking if the release is ready for download.", + ) slots_group = parser_pull.add_mutually_exclusive_group() slots_group.add_argument( "--force-slots", diff --git a/tests/darwin/dataset/release_test.py b/tests/darwin/dataset/release_test.py index a90fc0dc4..2ee86a84c 100644 --- a/tests/darwin/dataset/release_test.py +++ b/tests/darwin/dataset/release_test.py @@ -1,11 +1,12 @@ import shutil +from datetime import datetime from pathlib import Path from unittest.mock import patch import pytest import requests -from darwin.dataset.release import Release +from darwin.dataset.release import Release, ReleaseStatus from tests.fixtures import * @@ -16,8 +17,9 @@ def release(dataset_slug: str, team_slug_darwin_json_v2: str) -> Release: team_slug=team_slug_darwin_json_v2, version="latest", name="test", + status=ReleaseStatus("pending"), url="http://test.v7labs.com/", - export_date="now", + export_date=datetime.fromisoformat("2021-01-01T00:00:00+00:00"), image_count=None, class_count=None, available=True, diff --git a/tests/darwin/dataset/remote_dataset_test.py b/tests/darwin/dataset/remote_dataset_test.py index d94b7209d..5073a0c8c 100644 --- a/tests/darwin/dataset/remote_dataset_test.py +++ b/tests/darwin/dataset/remote_dataset_test.py @@ -16,7 +16,7 @@ from darwin.config import Config from darwin.dataset import RemoteDataset from darwin.dataset.download_manager import _download_image_from_json_annotation -from darwin.dataset.release import Release +from darwin.dataset.release import Release, ReleaseStatus from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2 from darwin.dataset.upload_manager import LocalFile, UploadHandlerV2 from darwin.datatypes import ManifestItem, ObjectStore, SegmentManifest @@ -662,6 +662,7 @@ def test_gets_latest_release_when_not_given_one( "team-slug", "0.1.0", "release-name", + ReleaseStatus("complete"), "http://darwin-fake-url.com", datetime.now(), None, @@ -692,6 +693,7 @@ def test_does_not_create_symlink_on_windows( "team-slug", "0.1.0", "release-name", + ReleaseStatus("complete"), "http://darwin-fake-url.com", datetime.now(), None, @@ -724,6 +726,7 @@ def test_continues_if_symlink_creation_fails( "team-slug", "0.1.0", "release-name", + ReleaseStatus("complete"), "http://darwin-fake-url.com", datetime.now(), None, @@ -758,6 +761,7 @@ def test_raises_if_release_format_is_not_json( remote_dataset.team, "0.1.0", "release-name", + ReleaseStatus("complete"), "http://darwin-fake-url.com", datetime.now(), None, @@ -779,6 +783,7 @@ def test_moves_properties_metadata_file( "team-slug", "0.1.0", "release-name", + ReleaseStatus("complete"), "http://darwin-fake-url.com", datetime.now(), None, @@ -808,6 +813,27 @@ def fake_download_zip(self, path): ) assert metadata_path.exists() + @patch("time.sleep", return_value=None) + def test_num_retries(self, mock_sleep, remote_dataset, pending_release): + with patch.object(remote_dataset, "get_release", return_value=pending_release): + with pytest.raises(ValueError): + remote_dataset.pull(release=pending_release, retry=True) + assert ( + mock_sleep.call_count == 60 + ) # Default values of 600 seconds / 10 seconds interval + + @patch("time.sleep", return_value=None) + def test_raises_after_max_retry_duration( + self, mock_sleep, remote_dataset, pending_release + ): + with patch.object(remote_dataset, "get_release", return_value=pending_release): + with pytest.raises(ValueError, match="is still processing"): + remote_dataset.pull(release=pending_release, retry=True) + + def test_raises_error_if_timeout_less_than_interval(self, remote_dataset): + with pytest.raises(ValueError): + remote_dataset.pull(retry=True, retry_timeout=5, retry_interval=10) + class TestPullNamingConvention: def _test_pull_naming_convention( @@ -1316,3 +1342,25 @@ def test_register_files_with_blocked_items(self, remote_dataset: RemoteDatasetV2 ) assert len(result["registered"]) == 0 assert len(result["blocked"]) == 1 + + +@pytest.mark.usefixtures("file_read_write_test") +class TestGetReleases: + @patch("darwin.backend_v2.BackendV2.get_exports") + def test_returns_unavailable_releases_when_retry_is_true( + self, mock_get_exports, remote_dataset, releases_api_response + ): + mock_get_exports.return_value = releases_api_response + releases = remote_dataset.get_releases(include_unavailable=True) + assert len(releases) == 2 + assert isinstance(releases[0], Release) + assert isinstance(releases[1], Release) + + @patch("darwin.backend_v2.BackendV2.get_exports") + def test_omits_unavailable_releases_when_retry_is_false( + self, mock_get_exports, remote_dataset, releases_api_response + ): + mock_get_exports.return_value = releases_api_response + releases = remote_dataset.get_releases(include_unavailable=False) + assert len(releases) == 1 + assert isinstance(releases[0], Release) diff --git a/tests/fixtures.py b/tests/fixtures.py index 37059da55..045d98888 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1,11 +1,13 @@ import shutil +from datetime import datetime from pathlib import Path -from typing import Generator +from typing import Generator, List from zipfile import ZipFile import pytest from darwin.config import Config +from darwin.dataset.release import Release, ReleaseStatus @pytest.fixture @@ -118,3 +120,50 @@ def local_config_file( shutil.rmtree(darwin_path) if backup_darwin_path.exists(): shutil.move(backup_darwin_path, darwin_path) + + +@pytest.fixture +def pending_release() -> Release: + return Release( + dataset_slug="dataset_slug", + team_slug="team_slug", + version="1", + name="name", + status=ReleaseStatus("pending"), + url=None, + export_date=datetime.now(), + image_count=1, + class_count=1, + available=False, + latest=False, + format="json", + ) + + +@pytest.fixture +def releases_api_response() -> List[dict]: + return [ + { + "name": "release_1", + "status": ReleaseStatus("complete"), + "version": 1, + "format": "darwin_json_2", + "metadata": { + "num_images": 1, + "annotation_classes": [{"id": 1, "name": "test_class"}], + }, + "inserted_at": "2024-06-28T12:29:02Z", + "latest": True, + "download_url": "download_url", + }, + { + "name": "release_2", + "status": ReleaseStatus("pending"), + "version": 2, + "format": "darwin_json_2", + "metadata": {}, + "inserted_at": "2024-06-28T12:32:33Z", + "latest": False, + "download_url": None, + }, + ]