Skip to content

Commit

Permalink
1. FileManager keeps run_mode and it decided if to make the file oper…
Browse files Browse the repository at this point in the history
…ation or not

2. FileManager is now the one that log file operations instead of the caller
3. Fixes to tests
  • Loading branch information
niradar committed May 28, 2024
1 parent 38bcdf7 commit 61821de
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 69 deletions.
58 changes: 20 additions & 38 deletions df_finder3.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,14 @@ def check_and_update_filename(new_filename):
def copy_or_move_file(tgt_filepath: str, move_to: str, src_filepath: str, target: str, test_mode, move=True):
new_src_path = os.path.join(move_to, os.path.relpath(tgt_filepath, target))
new_src_dir = os.path.dirname(new_src_path)
fm = file_manager.FileManager()
if not os.path.exists(new_src_dir) and not test_mode:
fm = file_manager.FileManager(not test_mode)
if not os.path.exists(new_src_dir):
fm.make_dirs(new_src_dir)
new_filename = check_and_update_filename(new_src_path)
src_to_dst = f"{src_filepath} to {new_filename}"
if not test_mode:
if move:
fm.move_file(src_filepath, new_filename)
else:
fm.copy_file(src_filepath, new_filename)
logger.info(f"{'Moved' if move else 'Copied'} {src_to_dst}")
if move:
fm.move_file(src_filepath, new_filename)
else:
logger.info(f"Test Mode: Would {'move' if move else 'copy'} {src_to_dst}")
fm.copy_file(src_filepath, new_filename)
return new_filename


Expand Down Expand Up @@ -104,23 +99,21 @@ def clean_source_duplications(args, keys_to_clean=None, given_duplicates: Dict[s

unique_duplicate_files_found += 1
start_index = 1 if not keys_to_clean else 0
fm = file_manager.FileManager()
fm = file_manager.FileManager(args.run)
# Move all the other files to a new folder under the move_to folder
for src_filepath, _ in group[start_index:]:
new_src_path = os.path.join(source_dups_move_to, os.path.relpath(src_filepath, source))
new_src_dir = os.path.dirname(new_src_path)
if not os.path.exists(new_src_dir) and args.run:
if not os.path.exists(new_src_dir):
fm.make_dirs(new_src_dir)
new_filename = check_and_update_filename(new_src_path)
src_to_dst = f"{src_filepath} to {new_filename}"
if args.run:
fm.move_file(src_filepath, new_filename)
logger.info(f"Moved {src_to_dst}")
fm.move_file(src_filepath, new_filename)
duplicate_files_moved += 1

if unique_duplicate_files_found:
logger.info(
f"Cleaning source folder: Found {unique_duplicate_files_found} unique duplicate files in the source folder, moved {duplicate_files_moved} files to {source_dups_move_to}")
f"Cleaning source folder: Found {unique_duplicate_files_found} unique duplicate files in the source folder,"
f" moved {duplicate_files_moved} files to {source_dups_move_to}")
return unique_duplicate_files_found, duplicate_files_moved


Expand Down Expand Up @@ -168,7 +161,7 @@ def find_and_process_duplicates(args):
clean_source_duplications(args, source_duplicates_to_process.keys(), source_duplicates_to_process)) \
if source_duplicates_to_process else (0, 0)

deleted_source_folders = delete_empty_folders_in_tree(args.src) if args.run and args.delete_empty_folders else 0
deleted_source_folders = delete_empty_folders_in_tree(args.src, args.run) if args.run and args.delete_empty_folders else 0
return files_moved, files_created, deleted_source_folders, unique_source_duplicate_files_found, duplicate_source_files_moved


Expand Down Expand Up @@ -211,7 +204,7 @@ def collect_target_files(args):
return target_files


def delete_empty_folders_in_tree(base_path):
def delete_empty_folders_in_tree(base_path, run_mode):
folders_by_depth = {} # collect all folders in the source folder by depth
for root, dirs, files in os.walk(base_path, topdown=False):
if base_path == root:
Expand All @@ -221,14 +214,13 @@ def delete_empty_folders_in_tree(base_path):
folders_by_depth[depth] = []
folders_by_depth[depth].append(root)

fm = file_manager.FileManager()
fm = file_manager.FileManager(run_mode)
deleted_folders = 0
# delete empty folders starting from the deepest level excluding the base_path folder
for depth in sorted(folders_by_depth.keys(), reverse=True):
for folder in folders_by_depth[depth]:
if not os.listdir(folder):
fm.rmdir(folder)
logger.info(f"Deleted empty folder {folder}")
deleted_folders += 1
return deleted_folders

Expand Down Expand Up @@ -290,16 +282,12 @@ def parse_arguments(cust_args=None):


def validate_duplicate_files_destination(duplicate_files_destination, run_mode):
fm = file_manager.FileManager()
fm = file_manager.FileManager(run_mode)
if not os.path.isdir(duplicate_files_destination):
if run_mode:
try:
fm.make_dirs(duplicate_files_destination)
logger.info(f"Created destination folder {duplicate_files_destination}")
except Exception as e:
print_error(f"Error creating destination folder {duplicate_files_destination}: {e}")
else:
logger.info(f"Test Mode: Would create destination folder {duplicate_files_destination}")
try:
fm.make_dirs(duplicate_files_destination)
except Exception as e:
print_error(f"Error creating destination folder {duplicate_files_destination}: {e}")
return True


Expand All @@ -316,7 +304,7 @@ def output_results(args, deleted_source_folders, duplicate_source_files_moved, f
summary_header = "Summary (Test Mode):" if not args.run else "Summary:"
separator = "-" * max(len(summary_header), 40)
cache_hits = f"Hash requests: {hash_manager.persistent_cache_requests + hash_manager.temporary_cache_requests}," + \
f" Cache hits: {hash_manager.persistent_cache_hits + hash_manager.temporary_cache_hits}"
f" Cache hits: {hash_manager.persistent_cache_hits + hash_manager.temporary_cache_hits}"
logger.info(summary_header)
logger.info(separator)

Expand Down Expand Up @@ -345,11 +333,7 @@ def detect_pytest():

def main(args):
setup_logging()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(args.target)
fm.add_allowed_dir(args.src)
fm.add_allowed_dir(args.move_to)

file_manager.FileManager.reset_file_manager([args.target], [args.src, args.move_to], args.run)
validate_folder(args.src, "Source")
validate_folder(args.target, "Target")
validate_duplicate_files_destination(args.move_to, args.run)
Expand All @@ -362,8 +346,6 @@ def main(args):
hash_manager = HashManager(target_folder=args.target if not detect_pytest() else None)
if args.clear_cache:
hash_manager.clear_cache()


(files_moved, files_created, deleted_source_folders, unique_source_duplicate_files_found,
duplicate_source_files_moved) = find_and_process_duplicates(args)
hash_manager.save_data()
Expand Down
55 changes: 48 additions & 7 deletions file_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path
import shutil
import os
import logging

logger = logging.getLogger(__name__)


class FileManagerError(Exception):
Expand All @@ -16,10 +19,12 @@ class FileManager:
_instance = None
protected_dirs = set()
allowed_dirs = set() # If set, only operations in these directories are allowed. Acts as a whitelist
run_mode = False

def __new__(cls, *args, **kwargs):
def __new__(cls, run_mode, *args, **kwargs):
if not cls._instance:
cls._instance = super(FileManager, cls).__new__(cls, *args, **kwargs)
cls._instance.run_mode = run_mode
return cls._instance

def add_protected_dir(self, dir_path):
Expand Down Expand Up @@ -60,7 +65,13 @@ def move_file(self, src, dst):
raise ProtectedPathError(
f"Operation not allowed: Attempt to move protected file or to protected directory: {src} -> {dst}")

shutil.move(src_path, dst_path)
src_to_dst = f"{src_path} to {dst_path}"
if self.run_mode:
shutil.move(src_path, dst_path)
logger.info(f"Moved {src_to_dst}")
else:
logger.info(f"Would have moved {src_to_dst}")

return True

def copy_file(self, src, dst):
Expand All @@ -71,7 +82,12 @@ def copy_file(self, src, dst):
raise ProtectedPathError(
f"Operation not allowed: Attempt to copy file to protected directory: {src} -> {dst}")

shutil.copy2(src_path, dst_path)
src_to_dst = f"{src_path} to {dst_path}"
if self.run_mode:
shutil.copy2(src_path, dst_path)
logger.info(f"Copied {src_to_dst}")
else:
logger.info(f"Would have copied {src_to_dst}")
return True

def delete_file(self, file_path):
Expand All @@ -80,7 +96,11 @@ def delete_file(self, file_path):
if self.is_protected_path(file_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected file: {file_path}")

os.remove(file_path)
if self.run_mode:
os.remove(file_path)
logger.info(f"Deleted {file_path}")
else:
logger.info(f"Would have deleted {file_path}")
return True

def make_dirs(self, dir_path):
Expand All @@ -89,8 +109,11 @@ def make_dirs(self, dir_path):
if self.is_protected_path(dir_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to create directory in protected path: "
f"{dir_path}")

os.makedirs(dir_path)
if self.run_mode:
os.makedirs(dir_path, exist_ok=True)
logger.info(f"Created directory {dir_path}")
else:
logger.info(f"Would have created directory {dir_path}")
return True

def rmdir(self, dir_path):
Expand All @@ -99,7 +122,11 @@ def rmdir(self, dir_path):
if self.is_protected_path(dir_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected directory: {dir_path}")

os.rmdir(dir_path)
if self.run_mode:
shutil.rmtree(dir_path)
logger.info(f"Deleted directory {dir_path}")
else:
logger.info(f"Would have deleted directory {dir_path}")
return True

def reset_protected_dirs(self):
Expand All @@ -114,3 +141,17 @@ def reset_all(self):
self.reset_protected_dirs()
self.reset_allowed_dirs()
return self

# new version for reset_file_manager that gets the protected and allowed directories as arguments
@staticmethod
def reset_file_manager(protected_dirs, allowed_dirs, run_mode):
FileManager._instance = None
fm = FileManager(run_mode).reset_all()
for dir_path in protected_dirs:
fm.add_protected_dir(dir_path)
for dir_path in allowed_dirs:
fm.add_allowed_dir(dir_path)

def set_run_mode(self, run_mode):
self.run_mode = run_mode
return self
11 changes: 5 additions & 6 deletions tests/helpers_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def get_folder_path(folder):
res += subfolder[i] + os.sep
return res[:-1]


def copy_files(file_numbers, src_dir):
for file_number in file_numbers:
src_file = os.path.join(IMG_DIR, f"{file_number}.jpg")
Expand All @@ -55,6 +56,7 @@ def copy_files(file_numbers, src_dir):

@pytest.fixture
def setup_teardown():
setup_logging()
# Setup: Create the temporary directories
source_dir = os.path.join(TEMP_DIR, "source")
target_dir = os.path.join(TEMP_DIR, "target")
Expand All @@ -65,15 +67,13 @@ def setup_teardown():
HashManager.reset_instance()
HashManager(target_folder=target_dir, filename=hash_file)

fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
fm.add_allowed_dir(source_dir)
fm.add_allowed_dir(move_to_dir)
# change file_manager.FileManager.reset_file_manager() to the new arguments
file_manager.FileManager.reset_file_manager([target_dir], [source_dir, move_to_dir], True)

os.makedirs(source_dir)
os.makedirs(target_dir)
os.makedirs(move_to_dir)
setup_logging()

yield source_dir, target_dir, move_to_dir, common_args

# Teardown: Delete the temporary directories
Expand Down Expand Up @@ -114,4 +114,3 @@ def simple_usecase_test(source_dir, target_dir, move_to_dir, max_files=3):
# Check no change to target
target_files = set(os.listdir(target_dir))
assert target_files == set([f"{i}.jpg" for i in range(1, max_files+1)]), "Target directory files have changed"

21 changes: 10 additions & 11 deletions tests/test_file_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import file_manager
from tests.helpers_testing import *
from pathlib import Path

Expand All @@ -8,7 +7,7 @@
def test_move_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2])
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir(target_dir)
file_to_move = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")
Expand Down Expand Up @@ -44,7 +43,7 @@ def test_move_file(setup_teardown):
def test_copy_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir(target_dir)
file_to_copy = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")
Expand Down Expand Up @@ -88,7 +87,7 @@ def test_copy_file(setup_teardown):
def test_delete_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir(target_dir)
file_to_delete = os.path.join(source_dir, "1.jpg")

Expand Down Expand Up @@ -120,7 +119,7 @@ def test_delete_file(setup_teardown):

def test_make_dirs(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir(target_dir)
dir_to_make = os.path.join(source_dir, "new_dir")

Expand Down Expand Up @@ -153,7 +152,7 @@ def test_make_dirs(setup_teardown):

def test_rmdir(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir(target_dir)
dir_to_remove = os.path.join(source_dir, "new_dir")
os.makedirs(dir_to_remove)
Expand Down Expand Up @@ -191,16 +190,16 @@ def test_rmdir(setup_teardown):

# The FileManager class should be a singleton, so we should not be able to create multiple instances of it.
def test_singleton():
fm1 = file_manager.FileManager()
fm2 = file_manager.FileManager()
fm1 = file_manager.FileManager(True)
fm2 = file_manager.FileManager(True)
assert fm1 is fm2
assert fm1 == fm2
assert fm1 is not None
assert fm2 is not None


def test_reset_protected_dirs():
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir("C:\\")
fm.add_protected_dir("D:\\")
fm.reset_protected_dirs()
Expand All @@ -209,7 +208,7 @@ def test_reset_protected_dirs():


def test_reset_allowed_dirs():
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_allowed_dir("C:\\")
fm.add_allowed_dir("D:\\")
fm.reset_allowed_dirs()
Expand All @@ -218,7 +217,7 @@ def test_reset_allowed_dirs():


def test_add_protected_dir():
fm = file_manager.FileManager().reset_all()
fm = file_manager.FileManager(True).reset_all()
fm.add_protected_dir("C:\\")
fm.add_protected_dir("D:\\")
assert len(fm.protected_dirs) == 2
Expand Down
Loading

0 comments on commit 61821de

Please sign in to comment.