From 7c76c58ed0432ee5925a1efb6981b45ab45e2bff Mon Sep 17 00:00:00 2001 From: "HOME-2022\\User" Date: Tue, 28 May 2024 13:39:32 +0300 Subject: [PATCH] 1. new class - FileManager - to make sure no changes are done to target folder 2. tests to the class 3. modify the script to use it in all file and folder operations --- df_finder3.py | 30 +++++--- file_manager.py | 86 ++++++++++++++++++++++ tests/helpers_testing.py | 4 ++ tests/test_file_manager.py | 144 +++++++++++++++++++++++++++++++++++++ 4 files changed, 255 insertions(+), 9 deletions(-) create mode 100644 file_manager.py create mode 100644 tests/test_file_manager.py diff --git a/df_finder3.py b/df_finder3.py index fbf10eb..559f5af 100644 --- a/df_finder3.py +++ b/df_finder3.py @@ -3,12 +3,12 @@ import os import sys -import shutil from collections import defaultdict import argparse import time import logging import tqdm +import file_manager from hash_manager import HashManager from logging_config import setup_logging from typing import Dict, List, Tuple @@ -49,15 +49,16 @@ def check_and_update_filename(new_filename): def copy_or_move_file(tgt_filepath: str, move_to: str, src_filepath: str, target: str, test_mode, move=True): new_src_path = os.path.join(move_to, os.path.relpath(tgt_filepath, target)) new_src_dir = os.path.dirname(new_src_path) + fm = file_manager.FileManager() if not os.path.exists(new_src_dir) and not test_mode: - os.makedirs(new_src_dir) + fm.make_dirs(new_src_dir) new_filename = check_and_update_filename(new_src_path) src_to_dst = f"{src_filepath} to {new_filename}" if not test_mode: if move: - shutil.move(src_filepath, new_filename) + fm.move_file(src_filepath, new_filename) else: - shutil.copy(src_filepath, new_filename) + fm.copy_file(src_filepath, new_filename) logger.info(f"{'Moved' if move else 'Copied'} {src_to_dst}") else: logger.info(f"Test Mode: Would {'move' if move else 'copy'} {src_to_dst}") @@ -103,16 +104,17 @@ def clean_source_duplications(args, keys_to_clean=None, given_duplicates: Dict[s unique_duplicate_files_found += 1 start_index = 1 if not keys_to_clean else 0 + fm = file_manager.FileManager() # Move all the other files to a new folder under the move_to folder for src_filepath, _ in group[start_index:]: new_src_path = os.path.join(source_dups_move_to, os.path.relpath(src_filepath, source)) new_src_dir = os.path.dirname(new_src_path) if not os.path.exists(new_src_dir) and args.run: - os.makedirs(new_src_dir) + fm.make_dirs(new_src_dir) new_filename = check_and_update_filename(new_src_path) src_to_dst = f"{src_filepath} to {new_filename}" if args.run: - shutil.move(src_filepath, new_filename) + fm.move_file(src_filepath, new_filename) logger.info(f"Moved {src_to_dst}") duplicate_files_moved += 1 @@ -219,12 +221,13 @@ def delete_empty_folders_in_tree(base_path): folders_by_depth[depth] = [] folders_by_depth[depth].append(root) + fm = file_manager.FileManager() deleted_folders = 0 # delete empty folders starting from the deepest level excluding the base_path folder for depth in sorted(folders_by_depth.keys(), reverse=True): for folder in folders_by_depth[depth]: if not os.listdir(folder): - os.rmdir(folder) + fm.rmdir(folder) logger.info(f"Deleted empty folder {folder}") deleted_folders += 1 return deleted_folders @@ -286,10 +289,11 @@ def parse_arguments(cust_args=None): def validate_duplicate_files_destination(duplicate_files_destination, run_mode): + fm = file_manager.FileManager() if not os.path.isdir(duplicate_files_destination): if run_mode: try: - os.makedirs(duplicate_files_destination) + fm.make_dirs(duplicate_files_destination) logger.info(f"Created destination folder {duplicate_files_destination}") except Exception as e: print_error(f"Error creating destination folder {duplicate_files_destination}: {e}") @@ -308,10 +312,15 @@ def any_is_subfolder_of(folders: List[str]) -> bool: def output_results(args, deleted_source_folders, duplicate_source_files_moved, files_created, files_moved, hash_manager): + summary_header = "Summary (Test Mode):" if not args.run else "Summary:" + separator = "-" * max(len(summary_header), 40) cache_hits = f"Hash requests: {hash_manager.persistent_cache_requests + hash_manager.temporary_cache_requests}," + \ f" Cache hits: {hash_manager.persistent_cache_hits + hash_manager.temporary_cache_hits}" + logger.info(summary_header) + logger.info(separator) + logger.debug(cache_hits) - res_str = f'Summary{" (Test Mode)" if not args.run else ""}: Move: {files_moved} files, Create: {files_created} copies' + res_str = f'Move: {files_moved} files, Create: {files_created} copies' if duplicate_source_files_moved: res_str += f", Moved {duplicate_source_files_moved} duplicate files from the source folder" if deleted_source_folders: @@ -347,6 +356,9 @@ def main(args): hash_manager = HashManager(target_folder=args.target if not detect_pytest() else None) if args.clear_cache: hash_manager.clear_cache() + fm = file_manager.FileManager().reset_protected_dirs() + fm.add_protected_dir(args.target) + (files_moved, files_created, deleted_source_folders, unique_source_duplicate_files_found, duplicate_source_files_moved) = find_and_process_duplicates(args) hash_manager.save_data() diff --git a/file_manager.py b/file_manager.py new file mode 100644 index 0000000..3b83d4d --- /dev/null +++ b/file_manager.py @@ -0,0 +1,86 @@ +from pathlib import Path +import shutil +import os + + +class FileManagerError(Exception): + pass + + +class ProtectedPathError(FileManagerError): + def __init__(self, message): + super().__init__(message) + + +class FileManager: + _instance = None + protected_dirs = set() + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(FileManager, cls).__new__(cls, *args, **kwargs) + return cls._instance + + def add_protected_dir(self, dir_path): + protected_dir = Path(dir_path).resolve() + if protected_dir not in self.protected_dirs: + self.protected_dirs.add(protected_dir) + + def is_protected_path(self, path): + path = Path(path).resolve() + if self.protected_dirs is None: # This should never happen in real life + raise FileManagerError("Protected directories not set") + return any(path == protected_dir or protected_dir in path.parents for protected_dir in self.protected_dirs) + + def move_file(self, src, dst): + src_path = Path(src).resolve() + dst_path = Path(dst).resolve() + + if self.is_protected_path(src_path) or self.is_protected_path(dst_path): + raise ProtectedPathError( + f"Operation not allowed: Attempt to move protected file or to protected directory: {src} -> {dst}") + + shutil.move(src_path, dst_path) + return True + + def copy_file(self, src, dst): + src_path = Path(src).resolve() + dst_path = Path(dst).resolve() + + if self.is_protected_path(dst_path): + raise ProtectedPathError( + f"Operation not allowed: Attempt to copy file to protected directory: {src} -> {dst}") + + shutil.copy2(src_path, dst_path) + return True + + def delete_file(self, file_path): + file_path = Path(file_path).resolve() + + if self.is_protected_path(file_path): + raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected file: {file_path}") + + os.remove(file_path) + return True + + def make_dirs(self, dir_path): + dir_path = Path(dir_path).resolve() + + if self.is_protected_path(dir_path): + raise ProtectedPathError(f"Operation not allowed: Attempt to create directory in protected path: {dir_path}") + + os.makedirs(dir_path) + return True + + def rmdir(self, dir_path): + dir_path = Path(dir_path).resolve() + + if self.is_protected_path(dir_path): + raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected directory: {dir_path}") + + os.rmdir(dir_path) + return True + + def reset_protected_dirs(self): + self.protected_dirs = set() + return self diff --git a/tests/helpers_testing.py b/tests/helpers_testing.py index 38cc620..b9ce59a 100644 --- a/tests/helpers_testing.py +++ b/tests/helpers_testing.py @@ -4,6 +4,7 @@ from hash_manager import HashManager from logging_config import setup_logging +import file_manager # Define the base directory for the tests BASE_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -64,6 +65,9 @@ def setup_teardown(): HashManager.reset_instance() HashManager(target_folder=target_dir, filename=hash_file) + fm = file_manager.FileManager().reset_protected_dirs() + fm.add_protected_dir(target_dir) + os.makedirs(source_dir) os.makedirs(target_dir) os.makedirs(move_to_dir) diff --git a/tests/test_file_manager.py b/tests/test_file_manager.py new file mode 100644 index 0000000..6e8ac31 --- /dev/null +++ b/tests/test_file_manager.py @@ -0,0 +1,144 @@ +import file_manager +from tests.helpers_testing import * + +# FileManager suppose to protect some directories from being moved, copied or deleted. + + +def test_move_file(setup_teardown): + source_dir, target_dir, move_to_dir, common_args = setup_teardown + setup_test_files(range(1, 6), [2]) + fm = file_manager.FileManager() + fm.add_protected_dir(target_dir) + file_to_move = os.path.join(source_dir, "1.jpg") + dst_file = os.path.join(target_dir, "1.jpg") + + # move to protected directory should fail + with pytest.raises(file_manager.ProtectedPathError): + fm.move_file(file_to_move, dst_file) + + # move from unprotected directory to unprotected directory should work + fm.move_file(file_to_move, os.path.join(move_to_dir, "1.jpg")) + assert os.path.exists(os.path.join(move_to_dir, "1.jpg")) + assert not os.path.exists(file_to_move) + + # move from protected directory should fail too + file_to_move = os.path.join(target_dir, "2.jpg") + with pytest.raises(file_manager.ProtectedPathError): + fm.move_file(file_to_move, os.path.join(move_to_dir, "2.jpg")) + assert os.path.exists(file_to_move) + assert not os.path.exists(os.path.join(move_to_dir, "2.jpg")) + + +def test_copy_file(setup_teardown): + source_dir, target_dir, move_to_dir, common_args = setup_teardown + setup_test_files(range(1, 6), [2, 3]) + fm = file_manager.FileManager() + fm.add_protected_dir(target_dir) + file_to_copy = os.path.join(source_dir, "1.jpg") + dst_file = os.path.join(target_dir, "1.jpg") + + # copy from unprotected directory to protected directory should fail + with pytest.raises(file_manager.ProtectedPathError): + fm.copy_file(file_to_copy, dst_file) + + # copy from unprotected directory to unprotected directory should work + fm.copy_file(file_to_copy, os.path.join(move_to_dir, "1.jpg")) + assert os.path.exists(os.path.join(move_to_dir, "1.jpg")) + assert os.path.exists(file_to_copy) + + # copy from protected directory to unprotected directory should work + file_to_copy = os.path.join(target_dir, "2.jpg") + fm.copy_file(file_to_copy, os.path.join(move_to_dir, "2.jpg")) + assert os.path.exists(os.path.join(move_to_dir, "2.jpg")) + assert os.path.exists(file_to_copy) + + # copy from protected directory to protected directory should fail + file_to_copy = os.path.join(target_dir, "3.jpg") + with pytest.raises(file_manager.ProtectedPathError): + fm.copy_file(file_to_copy, os.path.join(target_dir, "4.jpg")) + assert os.path.exists(file_to_copy) + assert not os.path.exists(os.path.join(target_dir, "4.jpg")) + + +def test_delete_file(setup_teardown): + source_dir, target_dir, move_to_dir, common_args = setup_teardown + setup_test_files(range(1, 6), [2, 3]) + fm = file_manager.FileManager() + fm.add_protected_dir(target_dir) + file_to_delete = os.path.join(source_dir, "1.jpg") + + # delete from unprotected directory should work + fm.delete_file(file_to_delete) + assert not os.path.exists(file_to_delete) + + # delete from protected directory should fail + file_to_delete = os.path.join(target_dir, "2.jpg") + with pytest.raises(file_manager.ProtectedPathError): + fm.delete_file(file_to_delete) + assert os.path.exists(file_to_delete) + + +def test_make_dirs(setup_teardown): + source_dir, target_dir, move_to_dir, common_args = setup_teardown + fm = file_manager.FileManager() + fm.add_protected_dir(target_dir) + dir_to_make = os.path.join(source_dir, "new_dir") + + # make dir in unprotected directory should work + fm.make_dirs(dir_to_make) + assert os.path.exists(dir_to_make) + + # make dir in protected directory should fail + dir_to_make = os.path.join(target_dir, "new_dir") + with pytest.raises(file_manager.ProtectedPathError): + fm.make_dirs(dir_to_make) + assert not os.path.exists(dir_to_make) + + # makedirs should work with multiple levels + dir_to_make = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir") + fm.make_dirs(dir_to_make) + assert os.path.exists(dir_to_make) + + +def test_rmdir(setup_teardown): + source_dir, target_dir, move_to_dir, common_args = setup_teardown + fm = file_manager.FileManager() + fm.add_protected_dir(target_dir) + dir_to_remove = os.path.join(source_dir, "new_dir") + os.makedirs(dir_to_remove) + + # remove dir in unprotected directory should work + fm.rmdir(dir_to_remove) + assert not os.path.exists(dir_to_remove) + + # remove dir in protected directory should fail + dir_to_remove = os.path.join(target_dir, "new_dir") + os.makedirs(dir_to_remove) + with pytest.raises(file_manager.ProtectedPathError): + fm.rmdir(dir_to_remove) + assert os.path.exists(dir_to_remove) + + # rmdir should work with multiple levels + dir_to_remove = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir") + os.makedirs(dir_to_remove) + fm.rmdir(dir_to_remove) + assert not os.path.exists(dir_to_remove) + + +# The FileManager class should be a singleton, so we should not be able to create multiple instances of it. +def test_singleton(): + fm1 = file_manager.FileManager() + fm2 = file_manager.FileManager() + assert fm1 is fm2 + assert fm1 == fm2 + assert fm1 is not None + assert fm2 is not None + + +def test_reset_protected_dirs(): + fm = file_manager.FileManager() + fm.add_protected_dir("C:\\") + fm.add_protected_dir("D:\\") + fm.reset_protected_dirs() + assert len(fm.protected_dirs) == 0 + assert fm.protected_dirs == set()