Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAR-2707][External] Allow repeated polling of pending export releases #876

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions darwin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def _run(args: Namespace, parser: ArgumentParser) -> None:
args.video_frames,
args.force_slots,
args.ignore_slots,
args.retry,
)
elif args.action == "import":
f.dataset_import(
Expand Down
10 changes: 9 additions & 1 deletion darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def pull_dataset(
video_frames: bool = False,
force_slots: bool = False,
ignore_slots: bool = False,
retry: bool = False,
) -> None:
"""
Downloads a remote dataset (images and annotations) in the datasets directory.
Expand All @@ -428,8 +429,14 @@ def pull_dataset(
Pulls video frames images instead of video files. Defaults to False.
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
retry: bool
If True, will repeatedly try to download the release if it is still processing up to a maximum of 5 minutes.
"""
version: str = DatasetIdentifier.parse(dataset_slug).version or "latest"
if version == "latest" and retry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few questions here:

  • I don't recall the details but is the name latest hardcoded by us?
  • What happens if a client deliberately passes the name latest with retry=True?
  • I don't think this restriction is necessary, can't we pick the name of the latest release ourselves before performing the download and then do the retry logic using it instead of latest. This would ensure we refer to the same export even if a new export would be created in the meantime

Copy link
Contributor Author

@JBWilkie JBWilkie Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall the details but is the name latest hardcoded by us?

Yes, latest is a reserved release name. If you try to create an export named latest, the api responds with {"errors":{"name":["is reserved"]}}

What happens if a client deliberately passes the name latest with retry=True?

We will return the latest available release

I don't think this restriction is necessary, can't we pick the name of the latest release ourselves before performing the download and then do the retry logic using it instead of latest. This would ensure we refer to the same export even if a new export would be created in the meantime

Actually yes, I think we can. This is because each release has an export_date of type datetime.datetime. This allows us to select the most recent release incase retry is passed as True. I'll make this change now, thank you for flagging

raise ValueError(
"To retry downloading a release, a release name must be provided. This can be done as follows:\n\ndarwin dataset pull team-slug/dataset-slug:release-name"
)
client: Client = _load_client(offline=False, maybe_guest=True)
try:
dataset: RemoteDataset = client.get_remote_dataset(
Expand All @@ -444,14 +451,15 @@ def pull_dataset(
_error("please re-authenticate")

try:
release: Release = dataset.get_release(version)
release: Release = dataset.get_release(version, retry)
dataset.pull(
release=release,
only_annotations=only_annotations,
use_folders=folders,
video_frames=video_frames,
force_slots=force_slots,
ignore_slots=ignore_slots,
retry=retry,
)
print_new_version_info(client)
except NotFound:
Expand Down
9 changes: 9 additions & 0 deletions darwin/dataset/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Optional

import requests

from darwin.dataset.identifier import DatasetIdentifier


Expand All @@ -22,6 +23,8 @@ class Release:
The version of the ``Release``.
name : str
The name of the ``Release``.
status : str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure if it's common in darwin-py but this would be better as an enum as it only has a select few values.

The status of the ``Release``.
url : Optional[str]
The full url used to download the ``Release``.
export_date : datetime.datetime
Expand All @@ -47,6 +50,8 @@ class Release:
The version of the ``Release``.
name : str
The name of the ``Release``.
status : str
The status of the ``Release``.
url : Optional[str]
The full url used to download the ``Release``.
export_date : datetime.datetime
Expand All @@ -69,6 +74,7 @@ def __init__(
team_slug: str,
version: str,
name: str,
status: str,
url: Optional[str],
export_date: datetime.datetime,
image_count: Optional[int],
Expand All @@ -81,6 +87,7 @@ def __init__(
self.team_slug = team_slug
self.version = version
self.name = name
self.status = status
self.url = url
self.export_date = export_date
self.image_count = image_count
Expand Down Expand Up @@ -155,6 +162,7 @@ def parse_json(
team_slug=team_slug,
version=payload["version"],
name=payload["name"],
status=payload["status"],
export_date=export_date,
url=None,
available=False,
Expand All @@ -169,6 +177,7 @@ def parse_json(
team_slug=team_slug,
version=payload["version"],
name=payload["name"],
status=payload["status"],
image_count=payload["metadata"]["num_images"],
class_count=len(payload["metadata"]["annotation_classes"]),
export_date=export_date,
Expand Down
57 changes: 51 additions & 6 deletions darwin/dataset/remote_dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import shutil
import tempfile
import time
import zipfile
from datetime import datetime
from pathlib import Path
Expand Down Expand Up @@ -207,6 +208,7 @@ def pull(
video_frames: bool = False,
force_slots: bool = False,
ignore_slots: bool = False,
retry: bool = False,
) -> Tuple[Optional[Callable[[], Iterator[Any]]], int]:
"""
Downloads a remote dataset (images and annotations) to the datasets directory.
Expand Down Expand Up @@ -237,6 +239,8 @@ def pull(
Pulls video frames images instead of video files.
force_slots: bool
Pulls all slots of items into deeper file structure ({prefix}/{item_name}/{slot_name}/{file_name})
retry: bool
If True, will repeatedly try to download the release if it is still processing up to a maximum of 5 minutes.

Returns
-------
Expand All @@ -251,16 +255,46 @@ def pull(
If the given ``release`` has an invalid format.
ValueError
If darwin in unable to get ``Team`` configuration.
ValueError
If the release is still processing after the maximum retry duration.
"""

console = self.console or Console()

if release is None:
release = self.get_release()
if retry:
raise ValueError(
"To retry downloading a release, a release name must be provided. This can be done as follows:\n\nrelease = dataset.get_release(name='release_name')\ndataset.pull(release=release, retry=True)"
)
else:
release = self.get_release(retry=retry)

if release.format != "json" and release.format != "darwin_json_2":
raise UnsupportedExportFormat(release.format)

if release.status == "pending":
if retry:
retry_duration = 300
retry_interval = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be more conventional to have these configurable via CLI or some SDK settings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@balysv This makes sense. I can see 2 options, both of which involve building in some validation:

  • 1: Make these values configurable in the ~/.config.yaml file, or:
  • 2: Add two additional arguments: retry_duration and retry_interval with default values of ~10 minutes & ~10 seconds. These can be configured, but if they're passed without retry=True then we will throw an error

I'm leaning toward the additional arguments

while release.status == "pending" and retry_duration > 0:
console.print(
f"Release '{release.name}' for dataset '{self.name}' is still processing. Retrying in {retry_interval} seconds... {retry_duration} seconds left before timeout."
)
time.sleep(retry_interval)
retry_duration -= retry_interval
release = self.get_release(release.name, retry=retry)
if release.status == "pending":
raise ValueError(
f"Release {release.name} for dataset '{self.name}' is still processing after {retry_interval} seconds. Please try again later."
)
else:
raise ValueError(
f"Release '{release.name}' for dataset '{self.name}' is still processing. Please wait for it to be ready.\n\n If you would like to automatically retry, set the `retry` parameter to `True` with the SDK, or use the `--retry` flag with the CLI."
)
console.print(
f"Release '{release.name}' for dataset '{self.name}' is ready for download. Starting download..."
)

release_dir = self.local_releases_path / release.name
release_dir.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -715,24 +749,31 @@ def get_report(self, granularity: str = "day") -> str:
"""

@abstractmethod
def get_releases(self) -> List["Release"]:
def get_releases(self, retry: bool = False) -> List["Release"]:
"""
Get a sorted list of releases with the most recent first.

Parameters
----------
retry : bool, default: False
If True, return all releases, including those that are not available.

Returns
-------
List["Release"]
Returns a sorted list of available ``Release``\\s with the most recent first.
"""

def get_release(self, name: str = "latest") -> "Release":
def get_release(self, name: str = "latest", retry: bool = True) -> "Release":
"""
Get a specific ``Release`` for this ``RemoteDataset``.

Parameters
----------
name : str, default: "latest"
Name of the export.
retry : bool, default: True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of the argument retry doesn't match what it's doing "return all releases, if true". It should be something like incude_pending or similar that is a bit more self explanataory

If True, return all releases, including those that are not available.

Returns
-------
Expand All @@ -744,9 +785,13 @@ def get_release(self, name: str = "latest") -> "Release":
NotFound
The selected ``Release`` does not exist.
"""
releases = self.get_releases()
releases = self.get_releases(retry)
if not releases:
raise NotFound(str(self.identifier))
raise NotFound(
str(
f"No releases found for dataset '{self.name}'. Please create an export of this dataset first."
)
)
JBWilkie marked this conversation as resolved.
Show resolved Hide resolved

# overwrite default name with stored dataset.release if supplied
if self.release and name == "latest":
Expand All @@ -759,7 +804,7 @@ def get_release(self, name: str = "latest") -> "Release":
return release
raise NotFound(
str(
f"Release name {name} not found in dataset {self.name}. Please check this release exists for this dataset."
f"Release name '{name}' not found in dataset '{self.name}'. Please check this release exists for this dataset."
)
)

Expand Down
25 changes: 19 additions & 6 deletions darwin/dataset/remote_dataset_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,15 @@ def __init__(
version=2,
)

def get_releases(self) -> List["Release"]:
def get_releases(self, retry: bool = False) -> List["Release"]:
"""
Get a sorted list of releases with the most recent first.

Parameters
----------
retry : bool, default: False
If True, return all releases, including those that are not available.

Returns
-------
List["Release"]
Expand All @@ -135,11 +140,19 @@ def get_releases(self) -> List["Release"]:
Release.parse_json(self.slug, self.team, payload)
for payload in releases_json
]
return sorted(
filter(lambda x: x.available, releases),
key=lambda x: x.version,
reverse=True,
)

if retry:
return sorted(
releases,
key=lambda x: x.version,
reverse=True,
)
else:
return sorted(
filter(lambda x: x.available, releases),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure you can have this nicer where only the first argument releases or filter(lambda x: x.available, releases) is chosen with an if and have a single return line:

return sorted(
                releases_fn,
                key=lambda x: x.version,
                reverse=True,
            )

key=lambda x: x.version,
reverse=True,
)

def push(
self,
Expand Down
5 changes: 5 additions & 0 deletions darwin/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,11 @@ def __init__(self) -> None:
action="store_true",
help="Pulls video frame images instead of video files.",
)
parser_pull.add_argument(
"--retry",
action="store_true",
help="Repeatedly try to download the release if it is still processing. Times out after 5 minutes.",
)
slots_group = parser_pull.add_mutually_exclusive_group()
slots_group.add_argument(
"--force-slots",
Expand Down
1 change: 1 addition & 0 deletions tests/darwin/dataset/release_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def release(dataset_slug: str, team_slug_darwin_json_v2: str) -> Release:
team_slug=team_slug_darwin_json_v2,
version="latest",
name="test",
status="test_status",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For documentation purposes, it'd be best to use actual values of export statuses here instead of stubs as they are enums and not arbitrary strings.

url="http://test.v7labs.com/",
export_date="now",
image_count=None,
Expand Down
48 changes: 48 additions & 0 deletions tests/darwin/dataset/remote_dataset_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ def test_gets_latest_release_when_not_given_one(
"team-slug",
"0.1.0",
"release-name",
"release-status",
"http://darwin-fake-url.com",
datetime.now(),
None,
Expand Down Expand Up @@ -692,6 +693,7 @@ def test_does_not_create_symlink_on_windows(
"team-slug",
"0.1.0",
"release-name",
"release-status",
"http://darwin-fake-url.com",
datetime.now(),
None,
Expand Down Expand Up @@ -724,6 +726,7 @@ def test_continues_if_symlink_creation_fails(
"team-slug",
"0.1.0",
"release-name",
"release-status",
"http://darwin-fake-url.com",
datetime.now(),
None,
Expand Down Expand Up @@ -758,6 +761,7 @@ def test_raises_if_release_format_is_not_json(
remote_dataset.team,
"0.1.0",
"release-name",
"release-status",
"http://darwin-fake-url.com",
datetime.now(),
None,
Expand All @@ -779,6 +783,7 @@ def test_moves_properties_metadata_file(
"team-slug",
"0.1.0",
"release-name",
"release-status",
"http://darwin-fake-url.com",
datetime.now(),
None,
Expand Down Expand Up @@ -808,6 +813,27 @@ def fake_download_zip(self, path):
)
assert metadata_path.exists()

def test_pull_raises_value_error_when_retry_is_true_and_release_is_none(
self, remote_dataset
):
with pytest.raises(ValueError):
remote_dataset.pull(release=None, retry=True)

@patch("time.sleep", return_value=None)
def test_num_retries(self, mock_sleep, remote_dataset, pending_release):
with patch.object(remote_dataset, "get_release", return_value=pending_release):
with pytest.raises(ValueError):
remote_dataset.pull(release=pending_release, retry=True)
assert mock_sleep.call_count == 30 # 300 seconds / 10 seconds interval

@patch("time.sleep", return_value=None)
def test_raises_after_max_retry_duration(
self, mock_sleep, remote_dataset, pending_release
):
with patch.object(remote_dataset, "get_release", return_value=pending_release):
with pytest.raises(ValueError, match="is still processing after"):
remote_dataset.pull(release=pending_release, retry=True)


class TestPullNamingConvention:
def _test_pull_naming_convention(
Expand Down Expand Up @@ -1316,3 +1342,25 @@ def test_register_files_with_blocked_items(self, remote_dataset: RemoteDatasetV2
)
assert len(result["registered"]) == 0
assert len(result["blocked"]) == 1


@pytest.mark.usefixtures("file_read_write_test")
class TestGetReleases:
@patch("darwin.backend_v2.BackendV2.get_exports")
def test_returns_unavailable_releases_when_retry_is_true(
self, mock_get_exports, remote_dataset, releases_api_response
):
mock_get_exports.return_value = releases_api_response
releases = remote_dataset.get_releases(retry=True)
assert len(releases) == 2
assert isinstance(releases[0], Release)
assert isinstance(releases[1], Release)

@patch("darwin.backend_v2.BackendV2.get_exports")
def test_omits_unavailable_releases_when_retry_is_false(
self, mock_get_exports, remote_dataset, releases_api_response
):
mock_get_exports.return_value = releases_api_response
releases = remote_dataset.get_releases(retry=False)
assert len(releases) == 1
assert isinstance(releases[0], Release)
Loading