diff --git a/darwin/cli_functions.py b/darwin/cli_functions.py index efde7456f..f5739bc80 100644 --- a/darwin/cli_functions.py +++ b/darwin/cli_functions.py @@ -899,6 +899,9 @@ def dataset_import( dataset_identifier=dataset_slug ) + if cpu_limit is not None: + use_multi_cpu = True + import_annotations( dataset, importer, diff --git a/darwin/dataset/release.py b/darwin/dataset/release.py index a8ce9e92d..239dbb50a 100644 --- a/darwin/dataset/release.py +++ b/darwin/dataset/release.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Optional import requests + from darwin.dataset.identifier import DatasetIdentifier diff --git a/darwin/importer/importer.py b/darwin/importer/importer.py index be960eb3a..17da94b12 100644 --- a/darwin/importer/importer.py +++ b/darwin/importer/importer.py @@ -1,3 +1,4 @@ +import concurrent.futures import uuid from collections import defaultdict from logging import getLogger @@ -37,7 +38,6 @@ from darwin.dataset.remote_dataset import RemoteDataset from rich.console import Console -from rich.progress import track from rich.theme import Theme import darwin.datatypes as dt @@ -675,12 +675,11 @@ def import_annotations( # noqa: C901 import_annotators: bool = False, import_reviewers: bool = False, overwrite: bool = False, - use_multi_cpu: bool = False, # Set to False to give time to resolve MP behaviours - cpu_limit: Optional[int] = None, # 0 because it's set later in logic + use_multi_cpu: bool = False, + cpu_limit: Optional[int] = None, ) -> None: """ Imports the given given Annotations into the given Dataset. - Parameters ---------- dataset : RemoteDataset @@ -719,20 +718,17 @@ def import_annotations( # noqa: C901 If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores. If ``cpu_limit`` is less than 1, it will be set to CPU count - 2. If ``cpu_limit`` is omitted, it will be set to CPU count - 2. - Raises ------- ValueError - - If ``file_paths`` is not a list. - If the application is unable to fetch any remote classes. - If the application was unable to find/parse any annotation files. - If the application was unable to fetch remote file list. - IncompatibleOptions - - If both ``append`` and ``delete_for_empty`` are specified as ``True``. """ + console = Console(theme=_console_theme()) if append and delete_for_empty: @@ -778,7 +774,6 @@ def import_annotations( # noqa: C901 local_files = [] local_files_missing_remotely = [] - # ! Other place we can use multiprocessing - hard to pass in the importer though maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse( importer, file_paths, console, use_multi_cpu, cpu_limit ) @@ -913,8 +908,34 @@ def import_annotations( # noqa: C901 if not continue_to_overwrite: return - # Need to re parse the files since we didn't save the annotations in memory - for local_path in set(local_file.path for local_file in local_files): # noqa: C401 + def import_annotation(parsed_file): + image_id, default_slot_name = remote_files[parsed_file.full_path] + if parsed_file.slots and parsed_file.slots[0].name: + default_slot_name = parsed_file.slots[0].name + + metadata_path = is_properties_enabled(parsed_file.path) + + errors, _ = _import_annotations( + dataset.client, + image_id, + remote_classes, + attributes, + parsed_file.annotations, + default_slot_name, + dataset, + append, + delete_for_empty, + import_annotators, + import_reviewers, + metadata_path, + ) + + if errors: + console.print(f"Errors importing {parsed_file.filename}", style="error") + for error in errors: + console.print(f"\t{error}", style="error") + + def process_local_file(local_path): imported_files: Union[List[dt.AnnotationFile], dt.AnnotationFile, None] = ( importer(local_path) ) @@ -925,7 +946,7 @@ def import_annotations( # noqa: C901 else: parsed_files = imported_files - # remove files missing on the server + # Remove files missing on the server missing_files = [ missing_file.full_path for missing_file in local_files_missing_remotely ] @@ -952,36 +973,55 @@ def import_annotations( # noqa: C901 ] if files_to_track: _warn_unsupported_annotations(files_to_track) - for parsed_file in track(files_to_track): - image_id, default_slot_name = remote_files[parsed_file.full_path] - # We need to check if name is not-None as Darwin JSON 1.0 - # defaults to name=None - if parsed_file.slots and parsed_file.slots[0].name: - default_slot_name = parsed_file.slots[0].name - - metadata_path = is_properties_enabled(parsed_file.path) - - errors, _ = _import_annotations( - dataset.client, - image_id, - remote_classes, - attributes, - parsed_file.annotations, # type: ignore - default_slot_name, - dataset, - append, - delete_for_empty, - import_annotators, - import_reviewers, - metadata_path, - ) - if errors: - console.print( - f"Errors importing {parsed_file.filename}", style="error" - ) - for error in errors: - console.print(f"\t{error}", style="error") + if use_multi_cpu: + with concurrent.futures.ThreadPoolExecutor( + max_workers=cpu_limit + ) as executor: + futures = [ + executor.submit(import_annotation, file) + for file in files_to_track + ] + for _ in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Importing annotations from local file", + ): + future = next(concurrent.futures.as_completed(futures)) + try: + future.result() + except Exception as exc: + console.print( + f"Generated an exception: {exc}", style="error" + ) + else: + for file in tqdm( + files_to_track, desc="Importing annotations from local file" + ): + import_annotation(file) + + if use_multi_cpu: + with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_limit) as executor: + futures = [ + executor.submit(process_local_file, local_path) + for local_path in {local_file.path for local_file in local_files} + ] + for _ in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Processing local annotation files", + ): + future = next(concurrent.futures.as_completed(futures)) + try: + future.result() + except Exception as exc: + console.print(f"Generated an exception: {exc}", style="error") + else: + for local_path in tqdm( + {local_file.path for local_file in local_files}, + desc="Processing local annotation files", + ): + process_local_file(local_path) def _get_multi_cpu_settings(