Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-2707][External] Allow repeated polling of pending export releases #876

Merged
merged 14 commits into from
Jul 19, 2024
3 changes: 3 additions & 0 deletions darwin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ def _run(args: Namespace, parser: ArgumentParser) -> None:
args.video_frames,
args.force_slots,
args.ignore_slots,
args.retry,
args.retry_timeout,
args.retry_interval,
)
elif args.action == "import":
f.dataset_import(
Expand Down
16 changes: 14 additions & 2 deletions darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ def pull_dataset(
video_frames: bool = False,
force_slots: bool = False,
ignore_slots: bool = False,
retry: bool = False,
retry_timeout: int = 600,
retry_interval: int = 10,
) -> None:
"""
Downloads a remote dataset (images and annotations) in the datasets directory.
Expand All @@ -428,6 +431,12 @@ def pull_dataset(
Pulls video frames images instead of video files. Defaults to False.
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
retry: bool
If True, will repeatedly try to download the release if it is still processing until the timeout is reached.
retry_timeout: int
If retrying, total time to wait for the release to be ready for download
retry_interval: int
If retrying, time to wait between retries of checking if the release is ready for download.
"""
version: str = DatasetIdentifier.parse(dataset_slug).version or "latest"
client: Client = _load_client(offline=False, maybe_guest=True)
Expand All @@ -444,19 +453,22 @@ def pull_dataset(
_error("please re-authenticate")

try:
release: Release = dataset.get_release(version)
release: Release = dataset.get_release(version, retry)
dataset.pull(
release=release,
only_annotations=only_annotations,
use_folders=folders,
video_frames=video_frames,
force_slots=force_slots,
ignore_slots=ignore_slots,
retry=retry,
retry_timeout=retry_timeout,
retry_interval=retry_interval,
)
print_new_version_info(client)
except NotFound:
_error(
f"Version '{dataset.identifier}:{version}' does not exist "
f"Version '{dataset.identifier}:{version}' does not exist. "
f"Use 'darwin dataset releases' to list all available versions."
)
except UnsupportedExportFormat as uef:
Expand Down
16 changes: 16 additions & 0 deletions darwin/dataset/release.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
import datetime
import shutil
from enum import Enum
from pathlib import Path
from typing import Any, Dict, Optional

import requests

from darwin.dataset.identifier import DatasetIdentifier


class ReleaseStatus(Enum):
PENDING = "pending"
COMPLETE = "complete"
FAILED = "failed"


class Release:
"""
Represents a release/export. Releases created this way can only contain items with 'completed'
Expand All @@ -22,6 +30,8 @@ class Release:
The version of the ``Release``.
name : str
The name of the ``Release``.
status : ReleaseStatus
The status of the ``Release``.
url : Optional[str]
The full url used to download the ``Release``.
export_date : datetime.datetime
Expand All @@ -47,6 +57,8 @@ class Release:
The version of the ``Release``.
name : str
The name of the ``Release``.
status : ReleaseStatus
The status of the ``Release``.
url : Optional[str]
The full url used to download the ``Release``.
export_date : datetime.datetime
Expand All @@ -69,6 +81,7 @@ def __init__(
team_slug: str,
version: str,
name: str,
status: ReleaseStatus,
url: Optional[str],
export_date: datetime.datetime,
image_count: Optional[int],
Expand All @@ -81,6 +94,7 @@ def __init__(
self.team_slug = team_slug
self.version = version
self.name = name
self.status = ReleaseStatus(status)
self.url = url
self.export_date = export_date
self.image_count = image_count
Expand Down Expand Up @@ -155,6 +169,7 @@ def parse_json(
team_slug=team_slug,
version=payload["version"],
name=payload["name"],
status=payload["status"],
export_date=export_date,
url=None,
available=False,
Expand All @@ -169,6 +184,7 @@ def parse_json(
team_slug=team_slug,
version=payload["version"],
name=payload["name"],
status=payload["status"],
image_count=payload["metadata"]["num_images"],
class_count=len(payload["metadata"]["annotation_classes"]),
export_date=export_date,
Expand Down
65 changes: 58 additions & 7 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
import tempfile
import time
import zipfile
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -207,6 +208,9 @@ def pull(
video_frames: bool = False,
force_slots: bool = False,
ignore_slots: bool = False,
retry: bool = False,
retry_timeout: int = 600,
retry_interval: int = 10,
) -> Tuple[Optional[Callable[[], Iterator[Any]]], int]:
"""
Downloads a remote dataset (images and annotations) to the datasets directory.
Expand Down Expand Up @@ -237,6 +241,8 @@ def pull(
Pulls video frames images instead of video files.
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
retry: bool
If True, will repeatedly try to download the release if it is still processing up to a maximum of 5 minutes.

Returns
-------
Expand All @@ -251,16 +257,44 @@ def pull(
If the given ``release`` has an invalid format.
ValueError
If darwin in unable to get ``Team`` configuration.
ValueError
If the release is still processing after the maximum retry duration.
"""

console = self.console or Console()

if retry and retry_timeout < retry_interval:
raise ValueError(
f"The value of retry_timeout '{retry_timeout}' must be greater than or equal to the value of retry_interval '{retry_interval}'."
)

if release is None:
release = self.get_release()
release = self.get_release(include_unavailable=retry)

if release.format != "json" and release.format != "darwin_json_2":
raise UnsupportedExportFormat(release.format)

if release.status.value == "pending":
if retry:
while release.status.value == "pending" and retry_timeout > 0:
console.print(
f"Release '{release.name}' for dataset '{self.name}' is still processing. Retrying in {retry_interval} seconds... {retry_timeout} seconds left before timeout."
)
time.sleep(retry_interval)
retry_timeout -= retry_interval
release = self.get_release(release.name, include_unavailable=retry)
if release.status.value == "pending":
raise ValueError(
f"Release {release.name} for dataset '{self.name}' is still processing. Please try again later."
)
else:
raise ValueError(
f"Release '{release.name}' for dataset '{self.name}' is still processing. Please wait for it to be ready.\n\n If you would like to automatically retry, set the `retry` parameter to `True` with the SDK, or use the `--retry` flag with the CLI."
)
console.print(
f"Release '{release.name}' for dataset '{self.name}' is ready for download. Starting download..."
)

release_dir = self.local_releases_path / release.name
release_dir.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -715,24 +749,33 @@ def get_report(self, granularity: str = "day") -> str:
"""

@abstractmethod
def get_releases(self) -> List["Release"]:
def get_releases(self, include_unavailable: bool = False) -> List["Release"]:
"""
Get a sorted list of releases with the most recent first.

Parameters
----------
include_unavailable : bool, default: False
If True, return all releases, including those that are not available.

Returns
-------
List["Release"]
Returns a sorted list of available ``Release``\\s with the most recent first.
"""

def get_release(self, name: str = "latest") -> "Release":
def get_release(
self, name: str = "latest", include_unavailable: bool = True
) -> "Release":
"""
Get a specific ``Release`` for this ``RemoteDataset``.

Parameters
----------
name : str, default: "latest"
Name of the export.
include_unavailable : bool, default: True
If True, return all releases, including those that are not available.

Returns
-------
Expand All @@ -744,22 +787,30 @@ def get_release(self, name: str = "latest") -> "Release":
NotFound
The selected ``Release`` does not exist.
"""
releases = self.get_releases()
releases = self.get_releases(include_unavailable=include_unavailable)
if not releases:
raise NotFound(str(self.identifier))
raise NotFound(
str(
f"No releases found for dataset '{self.name}'. Please create an export of this dataset first."
)
)
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved

# overwrite default name with stored dataset.release if supplied
if self.release and name == "latest":
name = self.release
elif name == "latest":
return next((release for release in releases if release.latest))
return (
sorted(releases, key=lambda x: x.export_date, reverse=True)[0]
if include_unavailable
else next((release for release in releases if release.latest))
)

for release in releases:
if str(release.name) == name:
return release
raise NotFound(
str(
f"Release name {name} not found in dataset {self.name}. Please check this release exists for this dataset."
f"Release name '{name}' not found in dataset '{self.name}'. Please check this release exists for this dataset."
)
)

Expand Down
14 changes: 12 additions & 2 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ def __init__(
version=2,
)

def get_releases(self) -> List["Release"]:
def get_releases(self, include_unavailable: bool = False) -> List["Release"]:
"""
Get a sorted list of releases with the most recent first.

Parameters
----------
include_unavailable : bool, default: False
If True, return all releases, including those that are not available.

Returns
-------
List["Release"]
Expand All @@ -135,8 +140,13 @@ def get_releases(self) -> List["Release"]:
Release.parse_json(self.slug, self.team, payload)
for payload in releases_json
]

return sorted(
filter(lambda x: x.available, releases),
(
releases
if include_unavailable
else filter(lambda x: x.available, releases)
),
key=lambda x: x.version,
reverse=True,
)
Expand Down
18 changes: 18 additions & 0 deletions darwin/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,24 @@ def __init__(self) -> None:
action="store_true",
help="Pulls video frame images instead of video files.",
)
parser_pull.add_argument(
"--retry",
action="store_true",
default=False,
help="Repeatedly try to download the release if it is still processing.",
)
parser_pull.add_argument(
"--retry-timeout",
type=int,
default=600,
help="Total time to wait for the release to be ready for download.",
)
parser_pull.add_argument(
"--retry-interval",
type=int,
default=10,
help="Time to wait between retries of checking if the release is ready for download.",
)
slots_group = parser_pull.add_mutually_exclusive_group()
slots_group.add_argument(
"--force-slots",
Expand Down
6 changes: 4 additions & 2 deletions tests/darwin/dataset/release_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import shutil
from datetime import datetime
from pathlib import Path
from unittest.mock import patch

import pytest
import requests

from darwin.dataset.release import Release
from darwin.dataset.release import Release, ReleaseStatus
from tests.fixtures import *


Expand All @@ -16,8 +17,9 @@ def release(dataset_slug: str, team_slug_darwin_json_v2: str) -> Release:
team_slug=team_slug_darwin_json_v2,
version="latest",
name="test",
status=ReleaseStatus("pending"),
url="http://test.v7labs.com/",
export_date="now",
export_date=datetime.fromisoformat("2021-01-01T00:00:00+00:00"),
image_count=None,
class_count=None,
available=True,
Expand Down
Loading