Skip to content

Commit

Permalink
Fixed console output
Browse files Browse the repository at this point in the history
  • Loading branch information
JBWilkie committed Jul 1, 2024
1 parent 3342ae1 commit 9b4c31f
Showing 1 changed file with 20 additions and 74 deletions.
94 changes: 20 additions & 74 deletions darwin/importer/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from darwin.path_utils import is_properties_enabled, parse_metadata

Unknown = Any # type: ignore

from tqdm import tqdm

if TYPE_CHECKING:
Expand Down Expand Up @@ -675,64 +674,9 @@ def import_annotations( # noqa: C901
import_annotators: bool = False,
import_reviewers: bool = False,
overwrite: bool = False,
use_multi_cpu: bool = False, # Set to False to give time to resolve MP behaviours
cpu_limit: Optional[int] = None, # 0 because it's set later in logic
use_multi_cpu: bool = False,
cpu_limit: Optional[int] = None,
) -> None:
"""
Imports the given given Annotations into the given Dataset.
Parameters
----------
dataset : RemoteDataset
Dataset where the Annotations will be imported to.
importer : Callable[[Path], Union[List[dt.AnnotationFile], dt.AnnotationFile, None]]
Parsing module containing the logic to parse the given Annotation files given in
``files_path``. See ``importer/format`` for a list of out of supported parsers.
file_paths : List[PathLike]
A list of ``Path``'s or strings containing the Annotations we wish to import.
append : bool
If ``True`` appends the given annotations to the datasets. If ``False`` will override them.
Incompatible with ``delete-for-empty``.
class_prompt : bool
If ``False`` classes will be created and added to the datasets without requiring a user's prompt.
delete_for_empty : bool, default: False
If ``True`` will use empty annotation files to delete all annotations from the remote file.
If ``False``, empty annotation files will simply be skipped.
Only works for V2 datasets.
Incompatible with ``append``.
import_annotators : bool, default: False
If ``True`` it will import the annotators from the files to the dataset, if available.
If ``False`` it will not import the annotators.
import_reviewers : bool, default: False
If ``True`` it will import the reviewers from the files to the dataset, if .
If ``False`` it will not import the reviewers.
overwrite : bool, default: False
If ``True`` it will bypass a warning that the import will overwrite the current annotations if any are present.
If ``False`` this warning will be skipped and the import will overwrite the current annotations without warning.
use_multi_cpu : bool, default: True
If ``True`` will use multiple available CPU cores to parse the annotation files.
If ``False`` will use only the current Python process, which runs in one core.
Processing using multiple cores is faster, but may slow down a machine also running other processes.
Processing with one core is slower, but will run well alongside other processes.
cpu_limit : int, default: 2 less than total cpu count
The maximum number of CPU cores to use when ``use_multi_cpu`` is ``True``.
If ``cpu_limit`` is greater than the number of available CPU cores, it will be set to the number of available cores.
If ``cpu_limit`` is less than 1, it will be set to CPU count - 2.
If ``cpu_limit`` is omitted, it will be set to CPU count - 2.
Raises
-------
ValueError
- If ``file_paths`` is not a list.
- If the application is unable to fetch any remote classes.
- If the application was unable to find/parse any annotation files.
- If the application was unable to fetch remote file list.
IncompatibleOptions
- If both ``append`` and ``delete_for_empty`` are specified as ``True``.
"""
console = Console(theme=_console_theme())

if append and delete_for_empty:
Expand Down Expand Up @@ -778,7 +722,6 @@ def import_annotations( # noqa: C901
local_files = []
local_files_missing_remotely = []

# ! Other place we can use multiprocessing - hard to pass in the importer though
maybe_parsed_files: Optional[Iterable[dt.AnnotationFile]] = _find_and_parse(
importer, file_paths, console, use_multi_cpu, cpu_limit
)
Expand All @@ -793,12 +736,8 @@ def import_annotations( # noqa: C901
]

console.print("Fetching remote file list...", style="info")
# This call will only filter by filename; so can return a superset of matched files across different paths
# There is logic in this function to then include paths to narrow down to the single correct matching file
remote_files: Dict[str, Tuple[str, str]] = {}

# Try to fetch files in large chunks; in case the filenames are too large and exceed the url size
# retry in smaller chunks
chunk_size = 100
while chunk_size > 0:
try:
Expand Down Expand Up @@ -885,7 +824,6 @@ def import_annotations( # noqa: C901
for cls in local_classes_not_in_dataset:
dataset.add_annotation_class(cls)

# Refetch classes to update mappings
if local_classes_not_in_team or local_classes_not_in_dataset:
maybe_remote_classes: List[dt.DictFreeForm] = dataset.fetch_remote_classes()
if not maybe_remote_classes:
Expand Down Expand Up @@ -915,8 +853,6 @@ def import_annotations( # noqa: C901

def import_annotation(parsed_file):
image_id, default_slot_name = remote_files[parsed_file.full_path]
# We need to check if name is not-None as Darwin JSON 1.0
# defaults to name=None
if parsed_file.slots and parsed_file.slots[0].name:
default_slot_name = parsed_file.slots[0].name

Expand All @@ -927,7 +863,7 @@ def import_annotation(parsed_file):
image_id,
remote_classes,
attributes,
parsed_file.annotations, # type: ignore
parsed_file.annotations,
default_slot_name,
dataset,
append,
Expand All @@ -953,7 +889,6 @@ def process_local_file(local_path):
else:
parsed_files = imported_files

# remove files missing on the server
missing_files = [
missing_file.full_path for missing_file in local_files_missing_remotely
]
Expand Down Expand Up @@ -981,7 +916,6 @@ def process_local_file(local_path):
if files_to_track:
_warn_unsupported_annotations(files_to_track)

# Multi-threading for individual annotation files
if use_multi_cpu:
with concurrent.futures.ThreadPoolExecutor(
max_workers=cpu_limit
Expand All @@ -990,31 +924,43 @@ def process_local_file(local_path):
executor.submit(import_annotation, file)
for file in files_to_track
]
for future in concurrent.futures.as_completed(futures):
for _ in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Importing files",
):
future = next(concurrent.futures.as_completed(futures))
try:
future.result()
except Exception as exc:
console.print(
f"Generated an exception: {exc}", style="error"
)
else:
for file in files_to_track:
for file in tqdm(files_to_track, desc="Importing files"):
import_annotation(file)

# Multi-threading the top-level loop
if use_multi_cpu:
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_limit) as executor:
futures = [
executor.submit(process_local_file, local_path)
for local_path in {local_file.path for local_file in local_files}
]
for future in concurrent.futures.as_completed(futures):
for _ in tqdm(
concurrent.futures.as_completed(futures),
total=len(futures),
desc="Processing local files",
):
future = next(concurrent.futures.as_completed(futures))
try:
future.result()
except Exception as exc:
console.print(f"Generated an exception: {exc}", style="error")
else:
for local_path in {local_file.path for local_file in local_files}:
for local_path in tqdm(
{local_file.path for local_file in local_files},
desc="Processing local files",
):
process_local_file(local_path)


Expand Down

0 comments on commit 9b4c31f

Please sign in to comment.