diff --git a/komga_cover_extractor.py b/komga_cover_extractor.py index e0f4d3e..703e2d1 100644 --- a/komga_cover_extractor.py +++ b/komga_cover_extractor.py @@ -43,7 +43,7 @@ from settings import * # Version of the script -script_version = (2, 4, 18) +script_version = (2, 4, 19) script_version_text = "v{}.{}.{}".format(*script_version) # Paths = existing library @@ -686,11 +686,15 @@ def get_file_size(file_path): # Recursively gets all the folders in a directory def get_all_folders_recursively_in_dir(dir_path): results = [] + for root, dirs, files in scandir.walk(dir_path): if root in download_folders or root in paths: continue - results.append(root) + folder_info = {"root": root, "dirs": dirs, "files": files} + + results.append(folder_info) + return results @@ -830,9 +834,11 @@ def on_created(self, event): return None # Get a list of all files in the root directory and its subdirectories. - files = [] - for x in download_folders: - files.extend(get_all_files_recursively_in_dir_watchdog(x)) + files = [ + file + for folder in download_folders + for file in get_all_files_recursively_in_dir_watchdog(folder) + ] # Check if all files in the root directory and its subdirectories are fully transferred. while True: @@ -878,11 +884,13 @@ def on_created(self, event): time.sleep(watchdog_discover_new_files_check_interval) # The current list of files in the root directory and its subdirectories. - new_files = [] - for x in download_folders: - new_files.extend( - get_all_files_recursively_in_dir_watchdog(x) + new_files = [ + file + for folder in download_folders + for file in get_all_files_recursively_in_dir_watchdog( + folder ) + ] # If any new files started transferring while we were checking the current files, # then we have more files to check. @@ -904,18 +912,11 @@ def on_created(self, event): # Proceed with the next steps here. print("\nAll files are transferred.") - new_transferred_dirs = [] - - if transferred_dirs: - # if it's already a folder object, then just add it to the new list - for x in transferred_dirs: - if isinstance(x, Folder): - new_transferred_dirs.append(x) - # if it's not a folder object, then make it a folder object - elif not isinstance(x, Folder): - new_transferred_dirs.append(create_folder_object(x)) - - transferred_dirs = new_transferred_dirs + # Make sure all items are a folder object + transferred_dirs = [ + create_folder_object(x) if not isinstance(x, Folder) else x + for x in transferred_dirs + ] except Exception as e: send_message( @@ -994,48 +995,43 @@ def on_created(self, event): send_message("\nWatching for changes... (WATCHDOG)", discord=False) -# Read all the lines of a text file and return them -def get_lines_from_file(file_path, ignore=[], ignore_paths_not_in_paths=False): +# Read all the lines from a text file, excluding specified lines. +def get_lines_from_file(file_path, ignore=[], check_paths=False): # Initialize an empty list to store the lines of the file results = [] try: # Open the file in read mode with open(file_path, "r") as file: - # If ignore_paths_not_in_paths flag is True - if ignore_paths_not_in_paths: - # Iterate over each line in the file - for line in file: - # Strip whitespace from the line - line = line.strip() - # If the line is not empty, not in ignore, starts with any of the strings in paths, and not already in results, add it to the list - if ( - line - and line not in ignore - and line.startswith(tuple(paths)) - and line not in results - ): - results.append(line) - # If ignore_paths_not_in_paths flag is False (default) - else: - # Iterate over each line in the file - for line in file: - # Strip whitespace from the line - line = line.strip() - # If the line is not empty and not in ignore, add it to the list - if line and line not in ignore: - results.append(line) - # If the file is not found + # Iterate over each line in the file + for line in file: + # Strip whitespace from the line + line = line.strip() + + if not line: + continue + + if line in ignore: + continue + + if check_paths and not line.startswith(tuple(paths)): + continue + + if line in results: + continue + + results.append(line) + + # Handle file not found exception except FileNotFoundError as e: # Print an error message and return an empty list send_message(f"File not found: {file_path}." + "\n" + str(e), error=True) return [] - # If any other exception is raised + + # Handle other exceptions except Exception as ex: # Print an error message and return an empty list - send_message( - f"An error occured while reading {file_path}." + "\n" + str(ex), error=True - ) + send_message(f"Error reading {file_path}." + "\n" + str(ex), error=True) return [] # Return the list of lines read from the file @@ -1198,13 +1194,11 @@ def process_single_type_path(path_to_process): process_auto_classification() path_obj = Path( path_str, - path_formats=path_formats if path_formats else [], - path_extensions=path_extensions if path_extensions else [], - library_types=path_library_types if path_library_types else [], - translation_source_types=path_translation_source_types - if path_translation_source_types - else [], - source_languages=path_source_languages if path_source_languages else [], + path_formats=path_formats or [], + path_extensions=path_extensions or [], + library_types=path_library_types or [], + translation_source_types=path_translation_source_types or [], + source_languages=path_source_languages or [], ) else: # process all paths except for the first one @@ -1213,13 +1207,12 @@ def process_single_type_path(path_to_process): path_obj = Path( path_str, - path_formats=path_formats if path_formats else file_formats, - path_extensions=path_extensions if path_extensions else file_extensions, - library_types=path_library_types if path_library_types else library_types, + path_formats=path_formats or file_formats, + path_extensions=path_extensions or file_extensions, + library_types=path_library_types or library_types, translation_source_types=path_translation_source_types - if path_translation_source_types - else translation_source_types, - source_languages=path_source_languages if path_source_languages else [], + or translation_source_types, + source_languages=path_source_languages or [], ) if not is_download_folders: @@ -1339,8 +1332,10 @@ def parse_my_args(): if download_folder: if r"\1" in download_folder[0]: split_download_folders = download_folder[0].split(r"\1") - for split_download_folder in split_download_folders: - new_download_folders.append([split_download_folder]) + new_download_folders.extend( + [split_download_folder] + for split_download_folder in split_download_folders + ) else: new_download_folders.append(download_folder) @@ -1361,9 +1356,9 @@ def parse_my_args(): if download_folders_with_types: print("\n\tdownload_folders_with_types:") for item in download_folders_with_types: - print("\t\tpath: " + str(item.path)) - print("\t\t\tformats: " + str(item.path_formats)) - print("\t\t\textensions: " + str(item.path_extensions)) + print(f"\t\tpath: {str(item.path)}") + print(f"\t\t\tformats: {str(item.path_formats)}") + print(f"\t\t\textensions: {str(item.path_extensions)}") if parser.watchdog: if parser.watchdog.lower() == "true": @@ -1378,13 +1373,11 @@ def parse_my_args(): if parser.paths is not None: new_paths = [] for path in parser.paths: - if path: - if r"\1" in path[0]: - split_paths = path[0].split(r"\1") - for split_path in split_paths: - new_paths.append([split_path]) - else: - new_paths.append(path) + if path and r"\1" in path[0]: + split_paths = path[0].split(r"\1") + new_paths.extend([split_path] for split_path in split_paths) + else: + new_paths.append(path) parser.paths = new_paths print("\tpaths:") @@ -1397,9 +1390,9 @@ def parse_my_args(): if paths_with_types: print("\n\tpaths_with_types:") for item in paths_with_types: - print("\t\tpath: " + str(item.path)) - print("\t\t\tformats: " + str(item.path_formats)) - print("\t\t\textensions: " + str(item.path_extensions)) + print(f"\t\tpath: {str(item.path)}") + print(f"\t\t\tformats: {str(item.path_formats)}") + print(f"\t\t\textensions: {str(item.path_extensions)}") print("\twatchdog: " + str(watchdog_toggle)) @@ -3008,7 +3001,7 @@ def get_file_part(file, chapter=False, series_name=None, subtitle=None): # remove the x or # from the string result = rx_remove_x_hash.sub("", part_search.group(0)) number = set_num_as_float_or_int(result) - if number: + if number != "": result = number return result @@ -3153,7 +3146,7 @@ def upgrade_to_volume_class( if file_obj.volume_number != "": if ( - file_obj.volume_part + file_obj.volume_part != "" and not isinstance(file_obj.volume_number, list) and int(file_obj.volume_number) == file_obj.volume_number ): @@ -5723,10 +5716,21 @@ def group_similar_series(messages_to_send): print("\nChecking download folders for items to match to existing library...") for download_folder in download_folders: if os.path.exists(download_folder): + # Get all the paths + folders = get_all_folders_recursively_in_dir(download_folder) + + # Reverse the list so we start with the deepest folders + # Helps when purging empty folders, since it won't purge a folder containing subfolders + folders.reverse() + # an array of unmatched items, used for skipping subsequent series # items that won't match unmatched_series = [] - for root, dirs, files in scandir.walk(download_folder): + for folder in folders: + root = folder["root"] + dirs = folder["dirs"] + files = folder["files"] + print("\n" + root) clean = None if ( @@ -5757,10 +5761,15 @@ def group_similar_series(messages_to_send): ) ) - # sort them by the index number - volumes = sorted( - volumes, key=lambda x: get_sort_key(x.index_number) - ) + # check that all volumes' index numbers aren't strings + if any(isinstance(item.index_number, str) for item in volumes): + # sort alphabetically by the file name + volumes = sorted(volumes, key=lambda x: x.name) + else: + # sort by the index number + volumes = sorted( + volumes, key=lambda x: get_sort_key(x.index_number) + ) exclude = None similar.cache_clear() @@ -5774,7 +5783,7 @@ def group_similar_series(messages_to_send): ) continue - if not file.volume_number: + if file.volume_number == "": print( "\tSkipping: " + file.name @@ -6902,24 +6911,31 @@ def group_similar_series(messages_to_send): print("No match found.") except Exception as e: send_message(str(e), error=True) + + # purge any empty folders + if folders: + for folder in folders: + check_and_delete_empty_folder(folder["root"]) + if grouped_notifications: send_discord_message( None, grouped_notifications, ) + webhook_use = None if messages_to_send: grouped_by_series_names = group_similar_series(messages_to_send) messages_to_send = [] if grouped_by_series_names: # sort them alphabetically by series name - grouped_by_series_names.sort(key=lambda x: x["series_name"]) + # grouped_by_series_names.sort(key=lambda x: x["series_name"]) for grouped_by_series_name in grouped_by_series_names: # sort the group's messages lowest to highest by the number field # the number can be a float or an array of floats - grouped_by_series_name["messages"].sort( - key=lambda x: x.fields[0]["value"].split(",")[0] - ) + # grouped_by_series_name["messages"].sort( + # key=lambda x: x.fields[0]["value"].split(",")[0] + # ) if output_chapter_covers_to_discord: for message in grouped_by_series_name["messages"][:]: cover = find_and_extract_cover( @@ -7301,68 +7317,63 @@ def search_for_regex_in_folder_name(folder_name): folders.reverse() for folder in folders: - if not os.path.isdir(folder): - continue + root = folder["root"] + dirs = folder["dirs"] + files = folder["files"] - if folder in download_folders: + if not os.path.isdir(root): continue - dirs = [] - files = [] - for item in os.listdir(folder): - path = os.path.join(folder, item) - if os.path.isdir(path): - dirs.append(item) - elif os.path.isfile(path): - files.append(item) + if root in download_folders: + continue try: clean = None if ( watchdog_toggle and download_folders - and any(x for x in download_folders if folder.startswith(x)) + and any(x for x in download_folders if root.startswith(x)) ): clean = clean_and_sort( - folder, + root, files, dirs, just_these_files=transferred_files, just_these_dirs=transferred_dirs, ) else: - clean = clean_and_sort(folder, files, dirs) + clean = clean_and_sort(root, files, dirs) files, dirs = clean[0], clean[1] volumes = upgrade_to_volume_class( - upgrade_to_file_class(files, folder), folder + upgrade_to_file_class(files, root), root ) matching = [] - dirname = os.path.dirname(folder) - basename = os.path.basename(folder) + dirname = os.path.dirname(root) + basename = os.path.basename(root) done = False volume_one = None volume_one_series_name = None # Main: Rename based on common series_name from volumes if volumes: - done = rename_based_on_volumes(folder) + done = rename_based_on_volumes(root) if ( not done and ( not volume_one_series_name or volume_one_series_name != basename ) and dirname in download_folders - and not re.search(basename, folder, re.IGNORECASE) + and not re.search(basename, root, re.IGNORECASE) ): - done = rename_based_on_brackets(folder) + done = rename_based_on_brackets(root) except Exception as e: send_message( "Error renaming folder: " + str(e), error=True, ) - check_and_delete_empty_folder(folder) + check_and_delete_empty_folder(root) print("\nLooking for folders to rename...") print("\tDownload Paths:") @@ -11676,7 +11687,7 @@ def main(): cached_paths = get_lines_from_file( os.path.join(LOGS_DIR, "cached_paths.txt"), ignore=paths + download_folders, - ignore_paths_not_in_paths=True, + check_paths=True, ) # Cache the paths if the user doesn't have a cached_paths.txt file