Skip to content

Commit

Permalink
[DAR-2045][External] Multi-threaded annotation imports (#880)
Browse files Browse the repository at this point in the history
* Made get_releases() return non-available releases

* Added status to the 'Release' class

* Multi-threaded annotation imports

* Better console messaging

* Fixed multi-threading with CLI annotation imports

* Undid accidental changes from different branch

* Undo accidental changes from different branch

* Re-added docstrings / comments
  • Loading branch information
JBWilkie committed Jul 3, 2024
1 parent f713588 commit feea232
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 41 deletions.
3 changes: 3 additions & 0 deletions darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,9 @@ def dataset_import(
dataset_identifier=dataset_slug
)

if cpu_limit is not None:
use_multi_cpu = True

import_annotations(
dataset,
importer,
Expand Down
1 change: 1 addition & 0 deletions darwin/dataset/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Optional

import requests

from darwin.dataset.identifier import DatasetIdentifier


Expand Down
122 changes: 81 additions & 41 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import uuid
from collections import defaultdict
from logging import getLogger
Expand Down Expand Up @@ -37,7 +38,6 @@
from darwin.dataset.remote_dataset import RemoteDataset

from rich.console import Console
from rich.progress import track
from rich.theme import Theme

import darwin.datatypes as dt
Expand Down Expand Up @@ -675,12 +675,11 @@ def import_annotations( # noqa: C901
import_annotators: bool = False,
import_reviewers: bool = False,
overwrite: bool = False,
use_multi_cpu: bool = False, # Set to False to give time to resolve MP behaviours
cpu_limit: Optional[int] = None, # 0 because it's set later in logic
use_multi_cpu: bool = False,
cpu_limit: Optional[int] = None,
) -> None:
"""
Imports the given given Annotations into the given Dataset.
Parameters
----------
dataset : RemoteDataset
Expand Down Expand Up @@ -719,20 +718,17 @@ def import_annotations( # noqa: C901
If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores.
If ``cpu_limit`` is less than 1, it will be set to CPU count - 2.
If ``cpu_limit`` is omitted, it will be set to CPU count - 2.
Raises
-------
ValueError
- If ``file_paths`` is not a list.
- If the application is unable to fetch any remote classes.
- If the application was unable to find/parse any annotation files.
- If the application was unable to fetch remote file list.
IncompatibleOptions
- If both ``append`` and ``delete_for_empty`` are specified as ``True``.
"""

console = Console(theme=_console_theme())

if append and delete_for_empty:
Expand Down Expand Up @@ -778,7 +774,6 @@ def import_annotations( # noqa: C901
local_files = []
local_files_missing_remotely = []

# ! Other place we can use multiprocessing - hard to pass in the importer though
maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse(
importer, file_paths, console, use_multi_cpu, cpu_limit
)
Expand Down Expand Up @@ -913,8 +908,34 @@ def import_annotations( # noqa: C901
if not continue_to_overwrite:
return

# Need to re parse the files since we didn't save the annotations in memory
for local_path in set(local_file.path for local_file in local_files): # noqa: C401
def import_annotation(parsed_file):
image_id, default_slot_name = remote_files[parsed_file.full_path]
if parsed_file.slots and parsed_file.slots[0].name:
default_slot_name = parsed_file.slots[0].name

metadata_path = is_properties_enabled(parsed_file.path)

errors, _ = _import_annotations(
dataset.client,
image_id,
remote_classes,
attributes,
parsed_file.annotations,
default_slot_name,
dataset,
append,
delete_for_empty,
import_annotators,
import_reviewers,
metadata_path,
)

if errors:
console.print(f"Errors importing {parsed_file.filename}", style="error")
for error in errors:
console.print(f"\t{error}", style="error")

def process_local_file(local_path):
imported_files: Union[List[dt.AnnotationFile], dt.AnnotationFile, None] = (
importer(local_path)
)
Expand All @@ -925,7 +946,7 @@ def import_annotations( # noqa: C901
else:
parsed_files = imported_files

# remove files missing on the server
# Remove files missing on the server
missing_files = [
missing_file.full_path for missing_file in local_files_missing_remotely
]
Expand All @@ -952,36 +973,55 @@ def import_annotations( # noqa: C901
]
if files_to_track:
_warn_unsupported_annotations(files_to_track)
for parsed_file in track(files_to_track):
image_id, default_slot_name = remote_files[parsed_file.full_path]
# We need to check if name is not-None as Darwin JSON 1.0
# defaults to name=None
if parsed_file.slots and parsed_file.slots[0].name:
default_slot_name = parsed_file.slots[0].name

metadata_path = is_properties_enabled(parsed_file.path)

errors, _ = _import_annotations(
dataset.client,
image_id,
remote_classes,
attributes,
parsed_file.annotations, # type: ignore
default_slot_name,
dataset,
append,
delete_for_empty,
import_annotators,
import_reviewers,
metadata_path,
)

if errors:
console.print(
f"Errors importing {parsed_file.filename}", style="error"
)
for error in errors:
console.print(f"\t{error}", style="error")
if use_multi_cpu:
with concurrent.futures.ThreadPoolExecutor(
max_workers=cpu_limit
) as executor:
futures = [
executor.submit(import_annotation, file)
for file in files_to_track
]
for _ in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Importing annotations from local file",
):
future = next(concurrent.futures.as_completed(futures))
try:
future.result()
except Exception as exc:
console.print(
f"Generated an exception: {exc}", style="error"
)
else:
for file in tqdm(
files_to_track, desc="Importing annotations from local file"
):
import_annotation(file)

if use_multi_cpu:
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_limit) as executor:
futures = [
executor.submit(process_local_file, local_path)
for local_path in {local_file.path for local_file in local_files}
]
for _ in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Processing local annotation files",
):
future = next(concurrent.futures.as_completed(futures))
try:
future.result()
except Exception as exc:
console.print(f"Generated an exception: {exc}", style="error")
else:
for local_path in tqdm(
{local_file.path for local_file in local_files},
desc="Processing local annotation files",
):
process_local_file(local_path)


def _get_multi_cpu_settings(
Expand Down

0 comments on commit feea232

Please sign in to comment.