Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for running on Windows to the Filesystem component and fix path handling on Windows #521

Merged
merged 16 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ofrak_core/ofrak/core/dtb.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
For more information see: https://devicetree-specification.readthedocs.io/en/stable/flattened-format.html
"""

import os
import posixpath
import struct
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -55,7 +55,7 @@ async def get_path(self) -> str:
return self.name

parent_node = await self.resource.get_parent_as_view(v_type=DtbNode)
return os.path.join(await parent_node.get_path(), self.name)
return posixpath.join(await parent_node.get_path(), self.name)


class DeviceTreeBlob(GenericBinary):
Expand Down Expand Up @@ -183,7 +183,7 @@ def caption(cls, attributes) -> str:

async def get_path(self):
parent_node = await self.resource.get_parent_as_view(v_type=DtbNode)
return os.path.join(await parent_node.get_path(), self.name)
return posixpath.join(await parent_node.get_path(), self.name)

async def get_value(self) -> Union[str, List[str], int, List[int], bytes, bytearray, None]:
if self.p_type is DtbPropertyType.DtbPropNoValue:
Expand Down Expand Up @@ -321,7 +321,7 @@ async def pack(self, resource: Resource, config: ComponentConfig = None):
):
# By default, add_item adds the missing nodes to complete the path of a previous node
if not dtb.exist_node(await node.get_path()):
dtb.add_item(fdt.Node(node.name), os.path.dirname(await node.get_path()))
dtb.add_item(fdt.Node(node.name), posixpath.dirname(await node.get_path()))
for prop in await node.resource.get_children_as_view(
v_type=DtbProperty,
r_filter=ResourceFilter(tags=[DtbProperty]),
Expand Down
137 changes: 97 additions & 40 deletions ofrak_core/ofrak/core/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import os
import stat
import sys
import warnings
import tempfile
import posixpath
from pathlib import PurePath
from dataclasses import dataclass
from typing import Dict, Iterable, Optional, Type, Union

Expand All @@ -22,6 +26,14 @@
)


def _warn_chmod_chown_windows(): # pragma: no cover
warnings.warn(
f"os.chown and os.chmod do not work on Windows platforms. \
Unix-like file ownership and permissions will not be properly handled while using OFRAK on this platform. \
If you require extended attributes, please use a different platform."
)
alchzh marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class FilesystemEntry(ResourceView):
"""
Expand Down Expand Up @@ -109,6 +121,7 @@ async def get_path(self) -> str:
Get a folder's path, with the `FilesystemRoot` as the path root.

:return: The full path name, with the `FilesystemRoot` ancestor as the path root
Always returns POSIX style paths with forward slashes.
alchzh marked this conversation as resolved.
Show resolved Hide resolved
"""
path = [self.get_name()]

Expand All @@ -122,9 +135,8 @@ async def get_path(self) -> str:
break
a_view = await a.view_as(FilesystemEntry)
path.append(a_view.get_name())
path.reverse()

return os.path.join(*path)
return posixpath.join(*reversed(path))

def apply_stat_attrs(self, path: str):
"""
Expand All @@ -134,8 +146,11 @@ def apply_stat_attrs(self, path: str):
:param path: Path on disk to set attributes of.
"""
if self.stat:
os.chown(path, self.stat.st_uid, self.stat.st_gid)
os.chmod(path, self.stat.st_mode)
if sys.platform == "win32":
_warn_chmod_chown_windows()
else:
os.chown(path, self.stat.st_uid, self.stat.st_gid)
os.chmod(path, self.stat.st_mode)
os.utime(path, (self.stat.st_atime, self.stat.st_mtime))
if self.xattrs:
for attr, value in self.xattrs.items():
Expand Down Expand Up @@ -173,11 +188,16 @@ async def flush_to_disk(self, root_path: str = ".", filename: Optional[str] = No
os.symlink(link_view.source_path, link_name)
assert len(list(await self.resource.get_children())) == 0
if self.stat:
# https://docs.python.org/3/library/os.html#os.supports_follow_symlinks
if os.chown in os.supports_follow_symlinks:
os.chown(link_name, self.stat.st_uid, self.stat.st_gid, follow_symlinks=False)
if os.chmod in os.supports_follow_symlinks:
os.chmod(link_name, self.stat.st_mode, follow_symlinks=False)
if sys.platform == "win32":
_warn_chmod_chown_windows()
else:
# https://docs.python.org/3/library/os.html#os.supports_follow_symlinks
if os.chown in os.supports_follow_symlinks:
os.chown(
link_name, self.stat.st_uid, self.stat.st_gid, follow_symlinks=False
)
if os.chmod in os.supports_follow_symlinks:
os.chmod(link_name, self.stat.st_mode, follow_symlinks=False)
if os.utime in os.supports_follow_symlinks:
os.utime(
link_name,
Expand All @@ -198,21 +218,42 @@ async def flush_to_disk(self, root_path: str = ".", filename: Optional[str] = No
self.apply_stat_attrs(file_name)
elif self.is_device():
device_name = os.path.join(root_path, entry_path)
if self.stat is None:
raise ValueError(
if sys.platform == "win32" or not hasattr(os, "mknod"):
warnings.warn(
f"Cannot create a device {entry_path} for a "
f"BlockDevice or CharacterDevice resource with no stat!"
f"BlockDevice or CharacterDevice resource on platform {sys.platform}! "
f"Creating an empty regular file instead."
)
os.mknod(device_name, self.stat.st_mode, self.stat.st_rdev)
self.apply_stat_attrs(device_name)
with open(device_name, "w") as f:
pass
self.apply_stat_attrs(device_name)
else:
if self.stat is None:
raise ValueError(
f"Cannot create a device {entry_path} for a "
f"BlockDevice or CharacterDevice resource with no stat!"
)
os.mknod(device_name, self.stat.st_mode, self.stat.st_rdev)
self.apply_stat_attrs(device_name)
alchzh marked this conversation as resolved.
Show resolved Hide resolved
elif self.is_fifo_pipe():
fifo_name = os.path.join(root_path, entry_path)
if self.stat is None:
raise ValueError(
f"Cannot create a fifo {entry_path} for a FIFOPipe resource " "with no stat!"
if sys.platform == "win32" or not hasattr(os, "mkfifo"):
warnings.warn(
f"Cannot create a fifo {entry_path} for a "
f"FIFOPipe resource on platform {sys.platform}! "
f"Creating an empty regular file instead."
)
os.mkfifo(fifo_name, self.stat.st_mode)
self.apply_stat_attrs(fifo_name)
with open(fifo_name, "w") as f:
pass
self.apply_stat_attrs(fifo_name)
else:
if self.stat is None:
raise ValueError(
f"Cannot create a fifo {entry_path} for a FIFOPipe resource "
"with no stat!"
)
os.mkfifo(fifo_name, self.stat.st_mode)
self.apply_stat_attrs(fifo_name)
alchzh marked this conversation as resolved.
Show resolved Hide resolved
else:
entry_info = f"Stat: {stat.S_IFMT(self.stat.st_mode):o}" if self.stat else ""
raise NotImplementedError(
Expand Down Expand Up @@ -245,7 +286,7 @@ async def get_entry(self, path: str) -> Optional[FilesystemEntry]:
:return: The child `FilesystemEntry` resource that was found. If nothing was found, `None`
is returned
"""
basename = os.path.basename(path)
basename = posixpath.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
Expand Down Expand Up @@ -313,6 +354,13 @@ class CharacterDevice(SpecialFileType):
"""


def _path_to_posixpath(path: str) -> str:
"""
Converts an OS-specific path to a POSIX style path
"""
return str(PurePath(path).as_posix())


@dataclass
class FilesystemRoot(ResourceView):
"""
Expand Down Expand Up @@ -341,7 +389,10 @@ async def initialize_from_disk(
for root, dirs, files in os.walk(root_path):
for d in sorted(dirs):
absolute_path = os.path.join(root, d)
relative_path = os.path.join(os.path.relpath(root, root_path), d)
relative_path_posix = posixpath.join(
_path_to_posixpath(os.path.relpath(root, root_path)), d
)

folder_attributes_stat = os.lstat(absolute_path)

mode = folder_attributes_stat.st_mode
Expand All @@ -365,23 +416,25 @@ async def initialize_from_disk(
folder_attributes_xattr = self._get_xattr_map(absolute_path)
if os.path.islink(absolute_path):
await self.add_special_file_entry(
relative_path,
relative_path_posix,
SymbolicLink(
relative_path,
relative_path_posix,
folder_attributes_stat,
folder_attributes_xattr,
os.readlink(absolute_path),
),
)
else:
await self.add_folder(
relative_path,
relative_path_posix,
folder_attributes_stat,
folder_attributes_xattr,
)
for f in sorted(files):
absolute_path = os.path.join(root, f)
relative_path = os.path.normpath(os.path.join(os.path.relpath(root, root_path), f))
relative_path_posix = _path_to_posixpath(
os.path.normpath(os.path.join(os.path.relpath(root, root_path), f))
)
file_attributes_stat = os.lstat(absolute_path)

mode = file_attributes_stat.st_mode
Expand All @@ -402,9 +455,9 @@ async def initialize_from_disk(
file_attributes_xattr = self._get_xattr_map(absolute_path)
if os.path.islink(absolute_path):
await self.add_special_file_entry(
relative_path,
relative_path_posix,
SymbolicLink(
relative_path,
relative_path_posix,
file_attributes_stat,
file_attributes_xattr,
os.readlink(absolute_path),
Expand All @@ -413,25 +466,29 @@ async def initialize_from_disk(
elif os.path.isfile(absolute_path):
with open(absolute_path, "rb") as fh:
await self.add_file(
relative_path,
relative_path_posix,
fh.read(),
file_attributes_stat,
file_attributes_xattr,
)
elif stat.S_ISFIFO(mode):
await self.add_special_file_entry(
relative_path,
FIFOPipe(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
FIFOPipe(relative_path_posix, file_attributes_stat, file_attributes_xattr),
)
elif stat.S_ISBLK(mode):
await self.add_special_file_entry(
relative_path,
BlockDevice(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
BlockDevice(
relative_path_posix, file_attributes_stat, file_attributes_xattr
),
)
elif stat.S_ISCHR(mode):
await self.add_special_file_entry(
relative_path,
CharacterDevice(relative_path, file_attributes_stat, file_attributes_xattr),
relative_path_posix,
CharacterDevice(
relative_path_posix, file_attributes_stat, file_attributes_xattr
),
)
else:
raise NotImplementedError(
Expand Down Expand Up @@ -482,7 +539,7 @@ async def get_entry(self, path: str):

:return: the descendant `FilesystemEntry`, if found; otherwise, returns `None`
"""
basename = os.path.basename(path)
basename = posixpath.basename(path)
# only searching paths with the same base name should reduce the search space by quite a lot
descendants = await self.resource.get_descendants_as_view(
FilesystemEntry,
Expand All @@ -492,7 +549,7 @@ async def get_entry(self, path: str):
)

for d in descendants:
if await d.get_path() == os.path.normpath(path):
if await d.get_path() == posixpath.normpath(path):
return d
return None

Expand Down Expand Up @@ -533,7 +590,7 @@ async def add_folder(
"""
# Normalizes and cleans up paths beginning with "./" and containing "./../" as well as
# other extraneous separators
split_dir = os.path.normpath(path).rstrip("/").lstrip("/").split("/")
split_dir = posixpath.normpath(path).strip("/").split("/")

parent: Union[FilesystemRoot, Folder] = self
for directory in split_dir:
Expand Down Expand Up @@ -585,8 +642,8 @@ async def add_file(

:return: the `File` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
filename = os.path.basename(path)
dirname = posixpath.dirname(path)
filename = posixpath.basename(path)

if dirname == "":
parent_folder = self
Expand Down Expand Up @@ -639,7 +696,7 @@ async def add_special_file_entry(

:return: The special `FilesystemEntry` resource that was added to the `FilesystemRoot`
"""
dirname = os.path.dirname(path)
dirname = posixpath.dirname(path)

if dirname == "":
parent_folder = self
Expand Down
6 changes: 3 additions & 3 deletions ofrak_core/ofrak/core/iso9660.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import logging
import os
import posixpath
import tempfile
from dataclasses import dataclass
from io import BytesIO
Expand Down Expand Up @@ -195,7 +195,7 @@ async def unpack(self, resource: Resource, config=None):

for root, dirs, files in iso.walk(**{path_var: "/"}):
for d in dirs:
path = os.path.join(root, d)
path = posixpath.join(root, d)
folder_tags = (ISO9660Entry, Folder)
entry = ISO9660Entry(
name=d,
Expand All @@ -211,7 +211,7 @@ async def unpack(self, resource: Resource, config=None):
path, None, None, folder_tags, entry.get_attributes_instances().values()
)
for f in files:
path = os.path.join(root, f)
path = posixpath.join(root, f)
file_tags = (ISO9660Entry, File)
fp = BytesIO()

Expand Down
2 changes: 1 addition & 1 deletion ofrak_core/ofrak/core/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async def unpack(self, resource: Resource, config: ComponentConfig = None) -> No
for filename in stdout.decode().splitlines():
# Handles relative parent paths and rooted paths, and normalizes paths like "./../"
rel_filename = os.path.relpath(filename)
if rel_filename.startswith("../"):
if rel_filename.startswith(".." + os.sep):
raise UnpackerError(
f"Tar archive contains a file {filename} that would extract to a parent "
f"directory {rel_filename}."
Expand Down
Loading
Loading