Skip to content

Commit

Permalink
1. new class - FileManager - to make sure no changes are done to targ…
Browse files Browse the repository at this point in the history
…et folder

2. tests to the class
3. modify the script to use it in all file and folder operations
  • Loading branch information
niradar committed May 28, 2024
1 parent e2049ea commit 7c76c58
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 9 deletions.
30 changes: 21 additions & 9 deletions df_finder3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

import os
import sys
import shutil
from collections import defaultdict
import argparse
import time
import logging
import tqdm
import file_manager
from hash_manager import HashManager
from logging_config import setup_logging
from typing import Dict, List, Tuple
Expand Down Expand Up @@ -49,15 +49,16 @@ def check_and_update_filename(new_filename):
def copy_or_move_file(tgt_filepath: str, move_to: str, src_filepath: str, target: str, test_mode, move=True):
new_src_path = os.path.join(move_to, os.path.relpath(tgt_filepath, target))
new_src_dir = os.path.dirname(new_src_path)
fm = file_manager.FileManager()
if not os.path.exists(new_src_dir) and not test_mode:
os.makedirs(new_src_dir)
fm.make_dirs(new_src_dir)
new_filename = check_and_update_filename(new_src_path)
src_to_dst = f"{src_filepath} to {new_filename}"
if not test_mode:
if move:
shutil.move(src_filepath, new_filename)
fm.move_file(src_filepath, new_filename)
else:
shutil.copy(src_filepath, new_filename)
fm.copy_file(src_filepath, new_filename)
logger.info(f"{'Moved' if move else 'Copied'} {src_to_dst}")
else:
logger.info(f"Test Mode: Would {'move' if move else 'copy'} {src_to_dst}")
Expand Down Expand Up @@ -103,16 +104,17 @@ def clean_source_duplications(args, keys_to_clean=None, given_duplicates: Dict[s

unique_duplicate_files_found += 1
start_index = 1 if not keys_to_clean else 0
fm = file_manager.FileManager()
# Move all the other files to a new folder under the move_to folder
for src_filepath, _ in group[start_index:]:
new_src_path = os.path.join(source_dups_move_to, os.path.relpath(src_filepath, source))
new_src_dir = os.path.dirname(new_src_path)
if not os.path.exists(new_src_dir) and args.run:
os.makedirs(new_src_dir)
fm.make_dirs(new_src_dir)
new_filename = check_and_update_filename(new_src_path)
src_to_dst = f"{src_filepath} to {new_filename}"
if args.run:
shutil.move(src_filepath, new_filename)
fm.move_file(src_filepath, new_filename)
logger.info(f"Moved {src_to_dst}")
duplicate_files_moved += 1

Expand Down Expand Up @@ -219,12 +221,13 @@ def delete_empty_folders_in_tree(base_path):
folders_by_depth[depth] = []
folders_by_depth[depth].append(root)

fm = file_manager.FileManager()
deleted_folders = 0
# delete empty folders starting from the deepest level excluding the base_path folder
for depth in sorted(folders_by_depth.keys(), reverse=True):
for folder in folders_by_depth[depth]:
if not os.listdir(folder):
os.rmdir(folder)
fm.rmdir(folder)
logger.info(f"Deleted empty folder {folder}")
deleted_folders += 1
return deleted_folders
Expand Down Expand Up @@ -286,10 +289,11 @@ def parse_arguments(cust_args=None):


def validate_duplicate_files_destination(duplicate_files_destination, run_mode):
fm = file_manager.FileManager()
if not os.path.isdir(duplicate_files_destination):
if run_mode:
try:
os.makedirs(duplicate_files_destination)
fm.make_dirs(duplicate_files_destination)
logger.info(f"Created destination folder {duplicate_files_destination}")
except Exception as e:
print_error(f"Error creating destination folder {duplicate_files_destination}: {e}")
Expand All @@ -308,10 +312,15 @@ def any_is_subfolder_of(folders: List[str]) -> bool:


def output_results(args, deleted_source_folders, duplicate_source_files_moved, files_created, files_moved, hash_manager):
summary_header = "Summary (Test Mode):" if not args.run else "Summary:"
separator = "-" * max(len(summary_header), 40)
cache_hits = f"Hash requests: {hash_manager.persistent_cache_requests + hash_manager.temporary_cache_requests}," + \
f" Cache hits: {hash_manager.persistent_cache_hits + hash_manager.temporary_cache_hits}"
logger.info(summary_header)
logger.info(separator)

logger.debug(cache_hits)
res_str = f'Summary{" (Test Mode)" if not args.run else ""}: Move: {files_moved} files, Create: {files_created} copies'
res_str = f'Move: {files_moved} files, Create: {files_created} copies'
if duplicate_source_files_moved:
res_str += f", Moved {duplicate_source_files_moved} duplicate files from the source folder"
if deleted_source_folders:
Expand Down Expand Up @@ -347,6 +356,9 @@ def main(args):
hash_manager = HashManager(target_folder=args.target if not detect_pytest() else None)
if args.clear_cache:
hash_manager.clear_cache()
fm = file_manager.FileManager().reset_protected_dirs()
fm.add_protected_dir(args.target)

(files_moved, files_created, deleted_source_folders, unique_source_duplicate_files_found,
duplicate_source_files_moved) = find_and_process_duplicates(args)
hash_manager.save_data()
Expand Down
86 changes: 86 additions & 0 deletions file_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from pathlib import Path
import shutil
import os


class FileManagerError(Exception):
pass


class ProtectedPathError(FileManagerError):
def __init__(self, message):
super().__init__(message)


class FileManager:
_instance = None
protected_dirs = set()

def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(FileManager, cls).__new__(cls, *args, **kwargs)
return cls._instance

def add_protected_dir(self, dir_path):
protected_dir = Path(dir_path).resolve()
if protected_dir not in self.protected_dirs:
self.protected_dirs.add(protected_dir)

def is_protected_path(self, path):
path = Path(path).resolve()
if self.protected_dirs is None: # This should never happen in real life
raise FileManagerError("Protected directories not set")
return any(path == protected_dir or protected_dir in path.parents for protected_dir in self.protected_dirs)

def move_file(self, src, dst):
src_path = Path(src).resolve()
dst_path = Path(dst).resolve()

if self.is_protected_path(src_path) or self.is_protected_path(dst_path):
raise ProtectedPathError(
f"Operation not allowed: Attempt to move protected file or to protected directory: {src} -> {dst}")

shutil.move(src_path, dst_path)
return True

def copy_file(self, src, dst):
src_path = Path(src).resolve()
dst_path = Path(dst).resolve()

if self.is_protected_path(dst_path):
raise ProtectedPathError(
f"Operation not allowed: Attempt to copy file to protected directory: {src} -> {dst}")

shutil.copy2(src_path, dst_path)
return True

def delete_file(self, file_path):
file_path = Path(file_path).resolve()

if self.is_protected_path(file_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected file: {file_path}")

os.remove(file_path)
return True

def make_dirs(self, dir_path):
dir_path = Path(dir_path).resolve()

if self.is_protected_path(dir_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to create directory in protected path: {dir_path}")

os.makedirs(dir_path)
return True

def rmdir(self, dir_path):
dir_path = Path(dir_path).resolve()

if self.is_protected_path(dir_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to delete protected directory: {dir_path}")

os.rmdir(dir_path)
return True

def reset_protected_dirs(self):
self.protected_dirs = set()
return self
4 changes: 4 additions & 0 deletions tests/helpers_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from hash_manager import HashManager
from logging_config import setup_logging
import file_manager

# Define the base directory for the tests
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -64,6 +65,9 @@ def setup_teardown():
HashManager.reset_instance()
HashManager(target_folder=target_dir, filename=hash_file)

fm = file_manager.FileManager().reset_protected_dirs()
fm.add_protected_dir(target_dir)

os.makedirs(source_dir)
os.makedirs(target_dir)
os.makedirs(move_to_dir)
Expand Down
144 changes: 144 additions & 0 deletions tests/test_file_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import file_manager
from tests.helpers_testing import *

# FileManager suppose to protect some directories from being moved, copied or deleted.


def test_move_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2])
fm = file_manager.FileManager()
fm.add_protected_dir(target_dir)
file_to_move = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")

# move to protected directory should fail
with pytest.raises(file_manager.ProtectedPathError):
fm.move_file(file_to_move, dst_file)

# move from unprotected directory to unprotected directory should work
fm.move_file(file_to_move, os.path.join(move_to_dir, "1.jpg"))
assert os.path.exists(os.path.join(move_to_dir, "1.jpg"))
assert not os.path.exists(file_to_move)

# move from protected directory should fail too
file_to_move = os.path.join(target_dir, "2.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.move_file(file_to_move, os.path.join(move_to_dir, "2.jpg"))
assert os.path.exists(file_to_move)
assert not os.path.exists(os.path.join(move_to_dir, "2.jpg"))


def test_copy_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager()
fm.add_protected_dir(target_dir)
file_to_copy = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")

# copy from unprotected directory to protected directory should fail
with pytest.raises(file_manager.ProtectedPathError):
fm.copy_file(file_to_copy, dst_file)

# copy from unprotected directory to unprotected directory should work
fm.copy_file(file_to_copy, os.path.join(move_to_dir, "1.jpg"))
assert os.path.exists(os.path.join(move_to_dir, "1.jpg"))
assert os.path.exists(file_to_copy)

# copy from protected directory to unprotected directory should work
file_to_copy = os.path.join(target_dir, "2.jpg")
fm.copy_file(file_to_copy, os.path.join(move_to_dir, "2.jpg"))
assert os.path.exists(os.path.join(move_to_dir, "2.jpg"))
assert os.path.exists(file_to_copy)

# copy from protected directory to protected directory should fail
file_to_copy = os.path.join(target_dir, "3.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.copy_file(file_to_copy, os.path.join(target_dir, "4.jpg"))
assert os.path.exists(file_to_copy)
assert not os.path.exists(os.path.join(target_dir, "4.jpg"))


def test_delete_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager()
fm.add_protected_dir(target_dir)
file_to_delete = os.path.join(source_dir, "1.jpg")

# delete from unprotected directory should work
fm.delete_file(file_to_delete)
assert not os.path.exists(file_to_delete)

# delete from protected directory should fail
file_to_delete = os.path.join(target_dir, "2.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.delete_file(file_to_delete)
assert os.path.exists(file_to_delete)


def test_make_dirs(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager()
fm.add_protected_dir(target_dir)
dir_to_make = os.path.join(source_dir, "new_dir")

# make dir in unprotected directory should work
fm.make_dirs(dir_to_make)
assert os.path.exists(dir_to_make)

# make dir in protected directory should fail
dir_to_make = os.path.join(target_dir, "new_dir")
with pytest.raises(file_manager.ProtectedPathError):
fm.make_dirs(dir_to_make)
assert not os.path.exists(dir_to_make)

# makedirs should work with multiple levels
dir_to_make = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir")
fm.make_dirs(dir_to_make)
assert os.path.exists(dir_to_make)


def test_rmdir(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager()
fm.add_protected_dir(target_dir)
dir_to_remove = os.path.join(source_dir, "new_dir")
os.makedirs(dir_to_remove)

# remove dir in unprotected directory should work
fm.rmdir(dir_to_remove)
assert not os.path.exists(dir_to_remove)

# remove dir in protected directory should fail
dir_to_remove = os.path.join(target_dir, "new_dir")
os.makedirs(dir_to_remove)
with pytest.raises(file_manager.ProtectedPathError):
fm.rmdir(dir_to_remove)
assert os.path.exists(dir_to_remove)

# rmdir should work with multiple levels
dir_to_remove = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir")
os.makedirs(dir_to_remove)
fm.rmdir(dir_to_remove)
assert not os.path.exists(dir_to_remove)


# The FileManager class should be a singleton, so we should not be able to create multiple instances of it.
def test_singleton():
fm1 = file_manager.FileManager()
fm2 = file_manager.FileManager()
assert fm1 is fm2
assert fm1 == fm2
assert fm1 is not None
assert fm2 is not None


def test_reset_protected_dirs():
fm = file_manager.FileManager()
fm.add_protected_dir("C:\\")
fm.add_protected_dir("D:\\")
fm.reset_protected_dirs()
assert len(fm.protected_dirs) == 0
assert fm.protected_dirs == set()

0 comments on commit 7c76c58

Please sign in to comment.