diff --git a/darwin/importer/importer.py b/darwin/importer/importer.py index 33f81979b..54ccfbdc7 100644 --- a/darwin/importer/importer.py +++ b/darwin/importer/importer.py @@ -30,7 +30,6 @@ from darwin.path_utils import is_properties_enabled, parse_metadata Unknown = Any # type: ignore - from tqdm import tqdm if TYPE_CHECKING: @@ -675,64 +674,9 @@ def import_annotations( # noqa: C901 import_annotators: bool = False, import_reviewers: bool = False, overwrite: bool = False, - use_multi_cpu: bool = False, # Set to False to give time to resolve MP behaviours - cpu_limit: Optional[int] = None, # 0 because it's set later in logic + use_multi_cpu: bool = False, + cpu_limit: Optional[int] = None, ) -> None: - """ - Imports the given given Annotations into the given Dataset. - - Parameters - ---------- - dataset : RemoteDataset - Dataset where the Annotations will be imported to. - importer : Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]] - Parsing module containing the logic to parse the given Annotation files given in - ``files_path``. See ``importer/format`` for a list of out of supported parsers. - file_paths : List[PathLike] - A list of ``Path``'s or strings containing the Annotations we wish to import. - append : bool - If ``True`` appends the given annotations to the datasets. If ``False`` will override them. - Incompatible with ``delete-for-empty``. - class_prompt : bool - If ``False`` classes will be created and added to the datasets without requiring a user's prompt. - delete_for_empty : bool, default: False - If ``True`` will use empty annotation files to delete all annotations from the remote file. - If ``False``, empty annotation files will simply be skipped. - Only works for V2 datasets. - Incompatible with ``append``. - import_annotators : bool, default: False - If ``True`` it will import the annotators from the files to the dataset, if available. - If ``False`` it will not import the annotators. - import_reviewers : bool, default: False - If ``True`` it will import the reviewers from the files to the dataset, if . - If ``False`` it will not import the reviewers. - overwrite : bool, default: False - If ``True`` it will bypass a warning that the import will overwrite the current annotations if any are present. - If ``False`` this warning will be skipped and the import will overwrite the current annotations without warning. - use_multi_cpu : bool, default: True - If ``True`` will use multiple available CPU cores to parse the annotation files. - If ``False`` will use only the current Python process, which runs in one core. - Processing using multiple cores is faster, but may slow down a machine also running other processes. - Processing with one core is slower, but will run well alongside other processes. - cpu_limit : int, default: 2 less than total cpu count - The maximum number of CPU cores to use when ``use_multi_cpu`` is ``True``. - If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores. - If ``cpu_limit`` is less than 1, it will be set to CPU count - 2. - If ``cpu_limit`` is omitted, it will be set to CPU count - 2. - - Raises - ------- - ValueError - - - If ``file_paths`` is not a list. - - If the application is unable to fetch any remote classes. - - If the application was unable to find/parse any annotation files. - - If the application was unable to fetch remote file list. - - IncompatibleOptions - - - If both ``append`` and ``delete_for_empty`` are specified as ``True``. - """ console = Console(theme=_console_theme()) if append and delete_for_empty: @@ -778,7 +722,6 @@ def import_annotations( # noqa: C901 local_files = [] local_files_missing_remotely = [] - # ! Other place we can use multiprocessing - hard to pass in the importer though maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse( importer, file_paths, console, use_multi_cpu, cpu_limit ) @@ -793,12 +736,8 @@ def import_annotations( # noqa: C901 ] console.print("Fetching remote file list...", style="info") - # This call will only filter by filename; so can return a superset of matched files across different paths - # There is logic in this function to then include paths to narrow down to the single correct matching file remote_files: Dict[str, Tuple[str, str]] = {} - # Try to fetch files in large chunks; in case the filenames are too large and exceed the url size - # retry in smaller chunks chunk_size = 100 while chunk_size > 0: try: @@ -885,7 +824,6 @@ def import_annotations( # noqa: C901 for cls in local_classes_not_in_dataset: dataset.add_annotation_class(cls) - # Refetch classes to update mappings if local_classes_not_in_team or local_classes_not_in_dataset: maybe_remote_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes() if not maybe_remote_classes: @@ -915,8 +853,6 @@ def import_annotations( # noqa: C901 def import_annotation(parsed_file): image_id, default_slot_name = remote_files[parsed_file.full_path] - # We need to check if name is not-None as Darwin JSON 1.0 - # defaults to name=None if parsed_file.slots and parsed_file.slots[0].name: default_slot_name = parsed_file.slots[0].name @@ -927,7 +863,7 @@ def import_annotation(parsed_file): image_id, remote_classes, attributes, - parsed_file.annotations, # type: ignore + parsed_file.annotations, default_slot_name, dataset, append, @@ -953,7 +889,6 @@ def process_local_file(local_path): else: parsed_files = imported_files - # remove files missing on the server missing_files = [ missing_file.full_path for missing_file in local_files_missing_remotely ] @@ -981,7 +916,6 @@ def process_local_file(local_path): if files_to_track: _warn_unsupported_annotations(files_to_track) - # Multi-threading for individual annotation files if use_multi_cpu: with concurrent.futures.ThreadPoolExecutor( max_workers=cpu_limit @@ -990,7 +924,12 @@ def process_local_file(local_path): executor.submit(import_annotation, file) for file in files_to_track ] - for future in concurrent.futures.as_completed(futures): + for _ in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Importing files", + ): + future = next(concurrent.futures.as_completed(futures)) try: future.result() except Exception as exc: @@ -998,23 +937,30 @@ def process_local_file(local_path): f"Generated an exception: {exc}", style="error" ) else: - for file in files_to_track: + for file in tqdm(files_to_track, desc="Importing files"): import_annotation(file) - # Multi-threading the top-level loop if use_multi_cpu: with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_limit) as executor: futures = [ executor.submit(process_local_file, local_path) for local_path in {local_file.path for local_file in local_files} ] - for future in concurrent.futures.as_completed(futures): + for _ in tqdm( + concurrent.futures.as_completed(futures), + total=len(futures), + desc="Processing local files", + ): + future = next(concurrent.futures.as_completed(futures)) try: future.result() except Exception as exc: console.print(f"Generated an exception: {exc}", style="error") else: - for local_path in {local_file.path for local_file in local_files}: + for local_path in tqdm( + {local_file.path for local_file in local_files}, + desc="Processing local files", + ): process_local_file(local_path)