Skip to content

Commit

Permalink
1. whitelist mode for FileManager
Browse files Browse the repository at this point in the history
2. df_finder3.py now checks also it access only allowed folder
  • Loading branch information
niradar committed May 28, 2024
1 parent 7c76c58 commit d858ff5
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 11 deletions.
8 changes: 6 additions & 2 deletions df_finder3.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ def detect_pytest():

def main(args):
setup_logging()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(args.target)
fm.add_allowed_dir(args.src)
fm.add_allowed_dir(args.move_to)

validate_folder(args.src, "Source")
validate_folder(args.target, "Target")
validate_duplicate_files_destination(args.move_to, args.run)
Expand All @@ -356,8 +361,7 @@ def main(args):
hash_manager = HashManager(target_folder=args.target if not detect_pytest() else None)
if args.clear_cache:
hash_manager.clear_cache()
fm = file_manager.FileManager().reset_protected_dirs()
fm.add_protected_dir(args.target)


(files_moved, files_created, deleted_source_folders, unique_source_duplicate_files_found,
duplicate_source_files_moved) = find_and_process_duplicates(args)
Expand Down
34 changes: 32 additions & 2 deletions file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, message):
class FileManager:
_instance = None
protected_dirs = set()
allowed_dirs = set() # If set, only operations in these directories are allowed. Acts as a whitelist

def __new__(cls, *args, **kwargs):
if not cls._instance:
Expand All @@ -23,14 +24,33 @@ def __new__(cls, *args, **kwargs):

def add_protected_dir(self, dir_path):
protected_dir = Path(dir_path).resolve()

# raise an error if the directory is already in the allowed_dirs
if self.allowed_dirs and protected_dir in self.allowed_dirs:
raise FileManagerError(f"Attempt to protect a directory that is also in the allowed_dirs: {dir_path}")

if protected_dir not in self.protected_dirs:
self.protected_dirs.add(protected_dir)

def add_allowed_dir(self, dir_path):
allowed_dir = Path(dir_path).resolve()

# raise an error if the directory is already in the protected_dirs
if self.protected_dirs and allowed_dir in self.protected_dirs:
raise FileManagerError(f"Attempt to allow a directory that is also in the protected_dirs: {dir_path}")

if allowed_dir not in self.allowed_dirs:
self.allowed_dirs.add(allowed_dir)

def is_protected_path(self, path):
path = Path(path).resolve()
if self.protected_dirs is None: # This should never happen in real life
raise FileManagerError("Protected directories not set")
return any(path == protected_dir or protected_dir in path.parents for protected_dir in self.protected_dirs)

# True if the path is in any of the protected directories or if it is not in any of the allowed directories
return any(path == protected_dir or protected_dir in path.parents for protected_dir in self.protected_dirs) or \
(self.allowed_dirs and not any(path == allowed_dir or allowed_dir in path.parents for allowed_dir
in self.allowed_dirs))

def move_file(self, src, dst):
src_path = Path(src).resolve()
Expand Down Expand Up @@ -67,7 +87,8 @@ def make_dirs(self, dir_path):
dir_path = Path(dir_path).resolve()

if self.is_protected_path(dir_path):
raise ProtectedPathError(f"Operation not allowed: Attempt to create directory in protected path: {dir_path}")
raise ProtectedPathError(f"Operation not allowed: Attempt to create directory in protected path: "
f"{dir_path}")

os.makedirs(dir_path)
return True
Expand All @@ -84,3 +105,12 @@ def rmdir(self, dir_path):
def reset_protected_dirs(self):
self.protected_dirs = set()
return self

def reset_allowed_dirs(self):
self.allowed_dirs = set()
return self

def reset_all(self):
self.reset_protected_dirs()
self.reset_allowed_dirs()
return self
4 changes: 3 additions & 1 deletion tests/helpers_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ def setup_teardown():
HashManager.reset_instance()
HashManager(target_folder=target_dir, filename=hash_file)

fm = file_manager.FileManager().reset_protected_dirs()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
fm.add_allowed_dir(source_dir)
fm.add_allowed_dir(move_to_dir)

os.makedirs(source_dir)
os.makedirs(target_dir)
Expand Down
94 changes: 88 additions & 6 deletions tests/test_file_manager.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import file_manager
from tests.helpers_testing import *
from pathlib import Path

# FileManager suppose to protect some directories from being moved, copied or deleted.


def test_move_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2])
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
file_to_move = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")
Expand All @@ -28,11 +29,22 @@ def test_move_file(setup_teardown):
assert os.path.exists(file_to_move)
assert not os.path.exists(os.path.join(move_to_dir, "2.jpg"))

# now add allowed directory setting
fm.add_allowed_dir(source_dir)
file_to_move = os.path.join(source_dir, "3.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.move_file(file_to_move, os.path.join(move_to_dir, "3.jpg")) # should fail as move_to_dir is not allowed
assert os.path.exists(file_to_move)
assert not os.path.exists(os.path.join(move_to_dir, "3.jpg"))

fm.add_allowed_dir(move_to_dir)
fm.move_file(file_to_move, os.path.join(move_to_dir, "3.jpg")) # should work now


def test_copy_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
file_to_copy = os.path.join(source_dir, "1.jpg")
dst_file = os.path.join(target_dir, "1.jpg")
Expand All @@ -59,11 +71,24 @@ def test_copy_file(setup_teardown):
assert os.path.exists(file_to_copy)
assert not os.path.exists(os.path.join(target_dir, "4.jpg"))

# now add allowed directory setting
fm.add_allowed_dir(source_dir)
file_to_copy = os.path.join(source_dir, "5.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.copy_file(file_to_copy, os.path.join(move_to_dir, "5.jpg"))
assert os.path.exists(file_to_copy)
assert not os.path.exists(os.path.join(move_to_dir, "5.jpg"))

fm.add_allowed_dir(move_to_dir)
fm.copy_file(file_to_copy, os.path.join(move_to_dir, "5.jpg")) # should work now
assert os.path.exists(os.path.join(move_to_dir, "5.jpg"))
assert os.path.exists(file_to_copy)


def test_delete_file(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
setup_test_files(range(1, 6), [2, 3])
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
file_to_delete = os.path.join(source_dir, "1.jpg")

Expand All @@ -77,10 +102,25 @@ def test_delete_file(setup_teardown):
fm.delete_file(file_to_delete)
assert os.path.exists(file_to_delete)

# copy 3.jpg to move_to_dir
shutil.copy(os.path.join(source_dir, "3.jpg"), os.path.join(move_to_dir, "3.jpg"))

# now add allowed directory setting - source should be allowed but move_to_dir should not be allowed
fm.add_allowed_dir(source_dir)
file_to_delete = os.path.join(source_dir, "3.jpg")

fm.delete_file(file_to_delete)
assert not os.path.exists(file_to_delete)

file_to_delete = os.path.join(move_to_dir, "3.jpg")
with pytest.raises(file_manager.ProtectedPathError):
fm.delete_file(file_to_delete)
assert os.path.exists(file_to_delete)


def test_make_dirs(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
dir_to_make = os.path.join(source_dir, "new_dir")

Expand All @@ -99,10 +139,21 @@ def test_make_dirs(setup_teardown):
fm.make_dirs(dir_to_make)
assert os.path.exists(dir_to_make)

# now add allowed directory setting
fm.add_allowed_dir(source_dir)
dir_to_make = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir2")
fm.make_dirs(dir_to_make)
assert os.path.exists(dir_to_make)

dir_to_make = os.path.join(move_to_dir, "another_new_dir", "sub_dir", "sub_sub_dir3")
with pytest.raises(file_manager.ProtectedPathError):
fm.make_dirs(dir_to_make)
assert not os.path.exists(dir_to_make)


def test_rmdir(setup_teardown):
source_dir, target_dir, move_to_dir, common_args = setup_teardown
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir(target_dir)
dir_to_remove = os.path.join(source_dir, "new_dir")
os.makedirs(dir_to_remove)
Expand All @@ -124,6 +175,19 @@ def test_rmdir(setup_teardown):
fm.rmdir(dir_to_remove)
assert not os.path.exists(dir_to_remove)

# now add allowed directory setting
fm.add_allowed_dir(source_dir)
dir_to_remove = os.path.join(source_dir, "another_new_dir", "sub_dir", "sub_sub_dir2")
os.makedirs(dir_to_remove)
fm.rmdir(dir_to_remove)
assert not os.path.exists(dir_to_remove)

dir_to_remove = os.path.join(move_to_dir, "another_new_dir", "sub_dir", "sub_sub_dir3")
os.makedirs(dir_to_remove)
with pytest.raises(file_manager.ProtectedPathError):
fm.rmdir(dir_to_remove)
assert os.path.exists(dir_to_remove)


# The FileManager class should be a singleton, so we should not be able to create multiple instances of it.
def test_singleton():
Expand All @@ -136,9 +200,27 @@ def test_singleton():


def test_reset_protected_dirs():
fm = file_manager.FileManager()
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir("C:\\")
fm.add_protected_dir("D:\\")
fm.reset_protected_dirs()
assert len(fm.protected_dirs) == 0
assert fm.protected_dirs == set()


def test_reset_allowed_dirs():
fm = file_manager.FileManager().reset_all()
fm.add_allowed_dir("C:\\")
fm.add_allowed_dir("D:\\")
fm.reset_allowed_dirs()
assert len(fm.allowed_dirs) == 0
assert fm.allowed_dirs == set()


def test_add_protected_dir():
fm = file_manager.FileManager().reset_all()
fm.add_protected_dir("C:\\")
fm.add_protected_dir("D:\\")
assert len(fm.protected_dirs) == 2
assert Path("C:\\").resolve() in fm.protected_dirs
assert Path("D:\\").resolve() in fm.protected_dirs

0 comments on commit d858ff5

Please sign in to comment.